Merge tag 'batadv-next-pullrequest-20210408' of git://git.open-mesh.org/linux-merge
[linux-2.6-microblaze.git] / drivers / block / drbd / drbd_worker.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_worker.c
4
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12 */
13
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30
31 static int make_ov_request(struct drbd_device *, int);
32 static int make_resync_request(struct drbd_device *, int);
33
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
50 void drbd_md_endio(struct bio *bio)
51 {
52         struct drbd_device *device;
53
54         device = bio->bi_private;
55         device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57         /* special case: drbd_md_read() during drbd_adm_attach() */
58         if (device->ldev)
59                 put_ldev(device);
60         bio_put(bio);
61
62         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63          * to timeout on the lower level device, and eventually detach from it.
64          * If this io completion runs after that timeout expired, this
65          * drbd_md_put_buffer() may allow us to finally try and re-attach.
66          * During normal operation, this only puts that extra reference
67          * down to 1 again.
68          * Make sure we first drop the reference, and only then signal
69          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70          * next drbd_md_sync_page_io(), that we trigger the
71          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72          */
73         drbd_md_put_buffer(device);
74         device->md_io.done = 1;
75         wake_up(&device->misc_wait);
76 }
77
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83         unsigned long flags = 0;
84         struct drbd_peer_device *peer_device = peer_req->peer_device;
85         struct drbd_device *device = peer_device->device;
86
87         spin_lock_irqsave(&device->resource->req_lock, flags);
88         device->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&device->read_ee))
91                 wake_up(&device->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
94         spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97         put_ldev(device);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_peer_device *peer_device = peer_req->peer_device;
106         struct drbd_device *device = peer_device->device;
107         struct drbd_connection *connection = peer_device->connection;
108         struct drbd_interval i;
109         int do_wake;
110         u64 block_id;
111         int do_al_complete_io;
112
113         /* after we moved peer_req to done_ee,
114          * we may no longer access it,
115          * it may be freed/reused already!
116          * (as soon as we release the req_lock) */
117         i = peer_req->i;
118         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119         block_id = peer_req->block_id;
120         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122         if (peer_req->flags & EE_WAS_ERROR) {
123                 /* In protocol != C, we usually do not send write acks.
124                  * In case of a write error, send the neg ack anyways. */
125                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126                         inc_unacked(device);
127                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
128         }
129
130         spin_lock_irqsave(&device->resource->req_lock, flags);
131         device->writ_cnt += peer_req->i.size >> 9;
132         list_move_tail(&peer_req->w.list, &device->done_ee);
133
134         /*
135          * Do not remove from the write_requests tree here: we did not send the
136          * Ack yet and did not wake possibly waiting conflicting requests.
137          * Removed from the tree from "drbd_process_done_ee" within the
138          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139          * _drbd_clear_done_ee.
140          */
141
142         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
145          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146         if (peer_req->flags & EE_WAS_ERROR)
147                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149         if (connection->cstate >= C_WF_REPORT_PARAMS) {
150                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152                         kref_put(&device->kref, drbd_destroy_device);
153         }
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         put_ldev(device);
166 }
167
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
171 void drbd_peer_request_endio(struct bio *bio)
172 {
173         struct drbd_peer_request *peer_req = bio->bi_private;
174         struct drbd_device *device = peer_req->peer_device->device;
175         bool is_write = bio_data_dir(bio) == WRITE;
176         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177                           bio_op(bio) == REQ_OP_DISCARD;
178
179         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_status,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_status)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 static void
198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201                 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_device *device = req->device;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213
214         /* If this request was aborted locally before,
215          * but now was completed "successfully",
216          * chances are that this caused arbitrary data corruption.
217          *
218          * "aborting" requests, or force-detaching the disk, is intended for
219          * completely blocked/hung local backing devices which do no longer
220          * complete requests at all, not even do error completions.  In this
221          * situation, usually a hard-reset and failover is the only way out.
222          *
223          * By "aborting", basically faking a local error-completion,
224          * we allow for a more graceful swichover by cleanly migrating services.
225          * Still the affected node has to be rebooted "soon".
226          *
227          * By completing these requests, we allow the upper layers to re-use
228          * the associated data pages.
229          *
230          * If later the local backing device "recovers", and now DMAs some data
231          * from disk into the original request pages, in the best case it will
232          * just put random data into unused pages; but typically it will corrupt
233          * meanwhile completely unrelated data, causing all sorts of damage.
234          *
235          * Which means delayed successful completion,
236          * especially for READ requests,
237          * is a reason to panic().
238          *
239          * We assume that a delayed *error* completion is OK,
240          * though we still will complain noisily about it.
241          */
242         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243                 if (__ratelimit(&drbd_ratelimit_state))
244                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246                 if (!bio->bi_status)
247                         drbd_panic_after_delayed_completion_of_aborted_request(device);
248         }
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(bio->bi_status)) {
252                 switch (bio_op(bio)) {
253                 case REQ_OP_WRITE_ZEROES:
254                 case REQ_OP_DISCARD:
255                         if (bio->bi_status == BLK_STS_NOTSUPP)
256                                 what = DISCARD_COMPLETED_NOTSUPP;
257                         else
258                                 what = DISCARD_COMPLETED_WITH_ERROR;
259                         break;
260                 case REQ_OP_READ:
261                         if (bio->bi_opf & REQ_RAHEAD)
262                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263                         else
264                                 what = READ_COMPLETED_WITH_ERROR;
265                         break;
266                 default:
267                         what = WRITE_COMPLETED_WITH_ERROR;
268                         break;
269                 }
270         } else {
271                 what = COMPLETED_OK;
272         }
273
274         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275         bio_put(bio);
276
277         /* not req_mod(), we need irqsave here! */
278         spin_lock_irqsave(&device->resource->req_lock, flags);
279         __req_mod(req, what, &m);
280         spin_unlock_irqrestore(&device->resource->req_lock, flags);
281         put_ldev(device);
282
283         if (m.bio)
284                 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289         SHASH_DESC_ON_STACK(desc, tfm);
290         struct page *page = peer_req->pages;
291         struct page *tmp;
292         unsigned len;
293         void *src;
294
295         desc->tfm = tfm;
296
297         crypto_shash_init(desc);
298
299         src = kmap_atomic(page);
300         while ((tmp = page_chain_next(page))) {
301                 /* all but the last page will be fully used */
302                 crypto_shash_update(desc, src, PAGE_SIZE);
303                 kunmap_atomic(src);
304                 page = tmp;
305                 src = kmap_atomic(page);
306         }
307         /* and now the last, possibly only partially used page */
308         len = peer_req->i.size & (PAGE_SIZE - 1);
309         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310         kunmap_atomic(src);
311
312         crypto_shash_final(desc, digest);
313         shash_desc_zero(desc);
314 }
315
316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318         SHASH_DESC_ON_STACK(desc, tfm);
319         struct bio_vec bvec;
320         struct bvec_iter iter;
321
322         desc->tfm = tfm;
323
324         crypto_shash_init(desc);
325
326         bio_for_each_segment(bvec, bio, iter) {
327                 u8 *src;
328
329                 src = kmap_atomic(bvec.bv_page);
330                 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
331                 kunmap_atomic(src);
332
333                 /* REQ_OP_WRITE_SAME has only one segment,
334                  * checksum the payload only once. */
335                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
336                         break;
337         }
338         crypto_shash_final(desc, digest);
339         shash_desc_zero(desc);
340 }
341
342 /* MAYBE merge common code with w_e_end_ov_req */
343 static int w_e_send_csum(struct drbd_work *w, int cancel)
344 {
345         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
346         struct drbd_peer_device *peer_device = peer_req->peer_device;
347         struct drbd_device *device = peer_device->device;
348         int digest_size;
349         void *digest;
350         int err = 0;
351
352         if (unlikely(cancel))
353                 goto out;
354
355         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
356                 goto out;
357
358         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
359         digest = kmalloc(digest_size, GFP_NOIO);
360         if (digest) {
361                 sector_t sector = peer_req->i.sector;
362                 unsigned int size = peer_req->i.size;
363                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
364                 /* Free peer_req and pages before send.
365                  * In case we block on congestion, we could otherwise run into
366                  * some distributed deadlock, if the other side blocks on
367                  * congestion as well, because our receiver blocks in
368                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
369                 drbd_free_peer_req(device, peer_req);
370                 peer_req = NULL;
371                 inc_rs_pending(device);
372                 err = drbd_send_drequest_csum(peer_device, sector, size,
373                                               digest, digest_size,
374                                               P_CSUM_RS_REQUEST);
375                 kfree(digest);
376         } else {
377                 drbd_err(device, "kmalloc() of digest failed.\n");
378                 err = -ENOMEM;
379         }
380
381 out:
382         if (peer_req)
383                 drbd_free_peer_req(device, peer_req);
384
385         if (unlikely(err))
386                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
387         return err;
388 }
389
390 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
391
392 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
393 {
394         struct drbd_device *device = peer_device->device;
395         struct drbd_peer_request *peer_req;
396
397         if (!get_ldev(device))
398                 return -EIO;
399
400         /* GFP_TRY, because if there is no memory available right now, this may
401          * be rescheduled for later. It is "only" background resync, after all. */
402         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
403                                        size, size, GFP_TRY);
404         if (!peer_req)
405                 goto defer;
406
407         peer_req->w.cb = w_e_send_csum;
408         spin_lock_irq(&device->resource->req_lock);
409         list_add_tail(&peer_req->w.list, &device->read_ee);
410         spin_unlock_irq(&device->resource->req_lock);
411
412         atomic_add(size >> 9, &device->rs_sect_ev);
413         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
414                                      DRBD_FAULT_RS_RD) == 0)
415                 return 0;
416
417         /* If it failed because of ENOMEM, retry should help.  If it failed
418          * because bio_add_page failed (probably broken lower level driver),
419          * retry may or may not help.
420          * If it does not, you may need to force disconnect. */
421         spin_lock_irq(&device->resource->req_lock);
422         list_del(&peer_req->w.list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         drbd_free_peer_req(device, peer_req);
426 defer:
427         put_ldev(device);
428         return -EAGAIN;
429 }
430
431 int w_resync_timer(struct drbd_work *w, int cancel)
432 {
433         struct drbd_device *device =
434                 container_of(w, struct drbd_device, resync_work);
435
436         switch (device->state.conn) {
437         case C_VERIFY_S:
438                 make_ov_request(device, cancel);
439                 break;
440         case C_SYNC_TARGET:
441                 make_resync_request(device, cancel);
442                 break;
443         }
444
445         return 0;
446 }
447
448 void resync_timer_fn(struct timer_list *t)
449 {
450         struct drbd_device *device = from_timer(device, t, resync_timer);
451
452         drbd_queue_work_if_unqueued(
453                 &first_peer_device(device)->connection->sender_work,
454                 &device->resync_work);
455 }
456
457 static void fifo_set(struct fifo_buffer *fb, int value)
458 {
459         int i;
460
461         for (i = 0; i < fb->size; i++)
462                 fb->values[i] = value;
463 }
464
465 static int fifo_push(struct fifo_buffer *fb, int value)
466 {
467         int ov;
468
469         ov = fb->values[fb->head_index];
470         fb->values[fb->head_index++] = value;
471
472         if (fb->head_index >= fb->size)
473                 fb->head_index = 0;
474
475         return ov;
476 }
477
478 static void fifo_add_val(struct fifo_buffer *fb, int value)
479 {
480         int i;
481
482         for (i = 0; i < fb->size; i++)
483                 fb->values[i] += value;
484 }
485
486 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
487 {
488         struct fifo_buffer *fb;
489
490         fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
491         if (!fb)
492                 return NULL;
493
494         fb->head_index = 0;
495         fb->size = fifo_size;
496         fb->total = 0;
497
498         return fb;
499 }
500
501 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
502 {
503         struct disk_conf *dc;
504         unsigned int want;     /* The number of sectors we want in-flight */
505         int req_sect; /* Number of sectors to request in this turn */
506         int correction; /* Number of sectors more we need in-flight */
507         int cps; /* correction per invocation of drbd_rs_controller() */
508         int steps; /* Number of time steps to plan ahead */
509         int curr_corr;
510         int max_sect;
511         struct fifo_buffer *plan;
512
513         dc = rcu_dereference(device->ldev->disk_conf);
514         plan = rcu_dereference(device->rs_plan_s);
515
516         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
517
518         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
519                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
520         } else { /* normal path */
521                 want = dc->c_fill_target ? dc->c_fill_target :
522                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
523         }
524
525         correction = want - device->rs_in_flight - plan->total;
526
527         /* Plan ahead */
528         cps = correction / steps;
529         fifo_add_val(plan, cps);
530         plan->total += cps * steps;
531
532         /* What we do in this step */
533         curr_corr = fifo_push(plan, 0);
534         plan->total -= curr_corr;
535
536         req_sect = sect_in + curr_corr;
537         if (req_sect < 0)
538                 req_sect = 0;
539
540         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
541         if (req_sect > max_sect)
542                 req_sect = max_sect;
543
544         /*
545         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
546                  sect_in, device->rs_in_flight, want, correction,
547                  steps, cps, device->rs_planed, curr_corr, req_sect);
548         */
549
550         return req_sect;
551 }
552
553 static int drbd_rs_number_requests(struct drbd_device *device)
554 {
555         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
556         int number, mxb;
557
558         sect_in = atomic_xchg(&device->rs_sect_in, 0);
559         device->rs_in_flight -= sect_in;
560
561         rcu_read_lock();
562         mxb = drbd_get_max_buffers(device) / 2;
563         if (rcu_dereference(device->rs_plan_s)->size) {
564                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
565                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
566         } else {
567                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
568                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
569         }
570         rcu_read_unlock();
571
572         /* Don't have more than "max-buffers"/2 in-flight.
573          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
574          * potentially causing a distributed deadlock on congestion during
575          * online-verify or (checksum-based) resync, if max-buffers,
576          * socket buffer sizes and resync rate settings are mis-configured. */
577
578         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
579          * mxb (as used here, and in drbd_alloc_pages on the peer) is
580          * "number of pages" (typically also 4k),
581          * but "rs_in_flight" is in "sectors" (512 Byte). */
582         if (mxb - device->rs_in_flight/8 < number)
583                 number = mxb - device->rs_in_flight/8;
584
585         return number;
586 }
587
588 static int make_resync_request(struct drbd_device *const device, int cancel)
589 {
590         struct drbd_peer_device *const peer_device = first_peer_device(device);
591         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
592         unsigned long bit;
593         sector_t sector;
594         const sector_t capacity = get_capacity(device->vdisk);
595         int max_bio_size;
596         int number, rollback_i, size;
597         int align, requeue = 0;
598         int i = 0;
599         int discard_granularity = 0;
600
601         if (unlikely(cancel))
602                 return 0;
603
604         if (device->rs_total == 0) {
605                 /* empty resync? */
606                 drbd_resync_finished(device);
607                 return 0;
608         }
609
610         if (!get_ldev(device)) {
611                 /* Since we only need to access device->rsync a
612                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
613                    to continue resync with a broken disk makes no sense at
614                    all */
615                 drbd_err(device, "Disk broke down during resync!\n");
616                 return 0;
617         }
618
619         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
620                 rcu_read_lock();
621                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
622                 rcu_read_unlock();
623         }
624
625         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
626         number = drbd_rs_number_requests(device);
627         if (number <= 0)
628                 goto requeue;
629
630         for (i = 0; i < number; i++) {
631                 /* Stop generating RS requests when half of the send buffer is filled,
632                  * but notify TCP that we'd like to have more space. */
633                 mutex_lock(&connection->data.mutex);
634                 if (connection->data.socket) {
635                         struct sock *sk = connection->data.socket->sk;
636                         int queued = sk->sk_wmem_queued;
637                         int sndbuf = sk->sk_sndbuf;
638                         if (queued > sndbuf / 2) {
639                                 requeue = 1;
640                                 if (sk->sk_socket)
641                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
642                         }
643                 } else
644                         requeue = 1;
645                 mutex_unlock(&connection->data.mutex);
646                 if (requeue)
647                         goto requeue;
648
649 next_sector:
650                 size = BM_BLOCK_SIZE;
651                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
652
653                 if (bit == DRBD_END_OF_BITMAP) {
654                         device->bm_resync_fo = drbd_bm_bits(device);
655                         put_ldev(device);
656                         return 0;
657                 }
658
659                 sector = BM_BIT_TO_SECT(bit);
660
661                 if (drbd_try_rs_begin_io(device, sector)) {
662                         device->bm_resync_fo = bit;
663                         goto requeue;
664                 }
665                 device->bm_resync_fo = bit + 1;
666
667                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
668                         drbd_rs_complete_io(device, sector);
669                         goto next_sector;
670                 }
671
672 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
673                 /* try to find some adjacent bits.
674                  * we stop if we have already the maximum req size.
675                  *
676                  * Additionally always align bigger requests, in order to
677                  * be prepared for all stripe sizes of software RAIDs.
678                  */
679                 align = 1;
680                 rollback_i = i;
681                 while (i < number) {
682                         if (size + BM_BLOCK_SIZE > max_bio_size)
683                                 break;
684
685                         /* Be always aligned */
686                         if (sector & ((1<<(align+3))-1))
687                                 break;
688
689                         if (discard_granularity && size == discard_granularity)
690                                 break;
691
692                         /* do not cross extent boundaries */
693                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
694                                 break;
695                         /* now, is it actually dirty, after all?
696                          * caution, drbd_bm_test_bit is tri-state for some
697                          * obscure reason; ( b == 0 ) would get the out-of-band
698                          * only accidentally right because of the "oddly sized"
699                          * adjustment below */
700                         if (drbd_bm_test_bit(device, bit+1) != 1)
701                                 break;
702                         bit++;
703                         size += BM_BLOCK_SIZE;
704                         if ((BM_BLOCK_SIZE << align) <= size)
705                                 align++;
706                         i++;
707                 }
708                 /* if we merged some,
709                  * reset the offset to start the next drbd_bm_find_next from */
710                 if (size > BM_BLOCK_SIZE)
711                         device->bm_resync_fo = bit + 1;
712 #endif
713
714                 /* adjust very last sectors, in case we are oddly sized */
715                 if (sector + (size>>9) > capacity)
716                         size = (capacity-sector)<<9;
717
718                 if (device->use_csums) {
719                         switch (read_for_csum(peer_device, sector, size)) {
720                         case -EIO: /* Disk failure */
721                                 put_ldev(device);
722                                 return -EIO;
723                         case -EAGAIN: /* allocation failed, or ldev busy */
724                                 drbd_rs_complete_io(device, sector);
725                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
726                                 i = rollback_i;
727                                 goto requeue;
728                         case 0:
729                                 /* everything ok */
730                                 break;
731                         default:
732                                 BUG();
733                         }
734                 } else {
735                         int err;
736
737                         inc_rs_pending(device);
738                         err = drbd_send_drequest(peer_device,
739                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
740                                                  sector, size, ID_SYNCER);
741                         if (err) {
742                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
743                                 dec_rs_pending(device);
744                                 put_ldev(device);
745                                 return err;
746                         }
747                 }
748         }
749
750         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
751                 /* last syncer _request_ was sent,
752                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
753                  * next sync group will resume), as soon as we receive the last
754                  * resync data block, and the last bit is cleared.
755                  * until then resync "work" is "inactive" ...
756                  */
757                 put_ldev(device);
758                 return 0;
759         }
760
761  requeue:
762         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
763         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
764         put_ldev(device);
765         return 0;
766 }
767
768 static int make_ov_request(struct drbd_device *device, int cancel)
769 {
770         int number, i, size;
771         sector_t sector;
772         const sector_t capacity = get_capacity(device->vdisk);
773         bool stop_sector_reached = false;
774
775         if (unlikely(cancel))
776                 return 1;
777
778         number = drbd_rs_number_requests(device);
779
780         sector = device->ov_position;
781         for (i = 0; i < number; i++) {
782                 if (sector >= capacity)
783                         return 1;
784
785                 /* We check for "finished" only in the reply path:
786                  * w_e_end_ov_reply().
787                  * We need to send at least one request out. */
788                 stop_sector_reached = i > 0
789                         && verify_can_do_stop_sector(device)
790                         && sector >= device->ov_stop_sector;
791                 if (stop_sector_reached)
792                         break;
793
794                 size = BM_BLOCK_SIZE;
795
796                 if (drbd_try_rs_begin_io(device, sector)) {
797                         device->ov_position = sector;
798                         goto requeue;
799                 }
800
801                 if (sector + (size>>9) > capacity)
802                         size = (capacity-sector)<<9;
803
804                 inc_rs_pending(device);
805                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
806                         dec_rs_pending(device);
807                         return 0;
808                 }
809                 sector += BM_SECT_PER_BIT;
810         }
811         device->ov_position = sector;
812
813  requeue:
814         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
815         if (i == 0 || !stop_sector_reached)
816                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
817         return 1;
818 }
819
820 int w_ov_finished(struct drbd_work *w, int cancel)
821 {
822         struct drbd_device_work *dw =
823                 container_of(w, struct drbd_device_work, w);
824         struct drbd_device *device = dw->device;
825         kfree(dw);
826         ov_out_of_sync_print(device);
827         drbd_resync_finished(device);
828
829         return 0;
830 }
831
832 static int w_resync_finished(struct drbd_work *w, int cancel)
833 {
834         struct drbd_device_work *dw =
835                 container_of(w, struct drbd_device_work, w);
836         struct drbd_device *device = dw->device;
837         kfree(dw);
838
839         drbd_resync_finished(device);
840
841         return 0;
842 }
843
844 static void ping_peer(struct drbd_device *device)
845 {
846         struct drbd_connection *connection = first_peer_device(device)->connection;
847
848         clear_bit(GOT_PING_ACK, &connection->flags);
849         request_ping(connection);
850         wait_event(connection->ping_wait,
851                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
852 }
853
854 int drbd_resync_finished(struct drbd_device *device)
855 {
856         struct drbd_connection *connection = first_peer_device(device)->connection;
857         unsigned long db, dt, dbdt;
858         unsigned long n_oos;
859         union drbd_state os, ns;
860         struct drbd_device_work *dw;
861         char *khelper_cmd = NULL;
862         int verify_done = 0;
863
864         /* Remove all elements from the resync LRU. Since future actions
865          * might set bits in the (main) bitmap, then the entries in the
866          * resync LRU would be wrong. */
867         if (drbd_rs_del_all(device)) {
868                 /* In case this is not possible now, most probably because
869                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
870                  * queue (or even the read operations for those packets
871                  * is not finished by now).   Retry in 100ms. */
872
873                 schedule_timeout_interruptible(HZ / 10);
874                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875                 if (dw) {
876                         dw->w.cb = w_resync_finished;
877                         dw->device = device;
878                         drbd_queue_work(&connection->sender_work, &dw->w);
879                         return 1;
880                 }
881                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882         }
883
884         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885         if (dt <= 0)
886                 dt = 1;
887
888         db = device->rs_total;
889         /* adjust for verify start and stop sectors, respective reached position */
890         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891                 db -= device->ov_left;
892
893         dbdt = Bit2KB(db/dt);
894         device->rs_paused /= HZ;
895
896         if (!get_ldev(device))
897                 goto out;
898
899         ping_peer(device);
900
901         spin_lock_irq(&device->resource->req_lock);
902         os = drbd_read_state(device);
903
904         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905
906         /* This protects us against multiple calls (that can happen in the presence
907            of application IO), and against connectivity loss just before we arrive here. */
908         if (os.conn <= C_CONNECTED)
909                 goto out_unlock;
910
911         ns = os;
912         ns.conn = C_CONNECTED;
913
914         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915              verify_done ? "Online verify" : "Resync",
916              dt + device->rs_paused, device->rs_paused, dbdt);
917
918         n_oos = drbd_bm_total_weight(device);
919
920         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921                 if (n_oos) {
922                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923                               n_oos, Bit2KB(1));
924                         khelper_cmd = "out-of-sync";
925                 }
926         } else {
927                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928
929                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930                         khelper_cmd = "after-resync-target";
931
932                 if (device->use_csums && device->rs_total) {
933                         const unsigned long s = device->rs_same_csum;
934                         const unsigned long t = device->rs_total;
935                         const int ratio =
936                                 (t == 0)     ? 0 :
937                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
938                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939                              "transferred %luK total %luK\n",
940                              ratio,
941                              Bit2KB(device->rs_same_csum),
942                              Bit2KB(device->rs_total - device->rs_same_csum),
943                              Bit2KB(device->rs_total));
944                 }
945         }
946
947         if (device->rs_failed) {
948                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
949
950                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951                         ns.disk = D_INCONSISTENT;
952                         ns.pdsk = D_UP_TO_DATE;
953                 } else {
954                         ns.disk = D_UP_TO_DATE;
955                         ns.pdsk = D_INCONSISTENT;
956                 }
957         } else {
958                 ns.disk = D_UP_TO_DATE;
959                 ns.pdsk = D_UP_TO_DATE;
960
961                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962                         if (device->p_uuid) {
963                                 int i;
964                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
966                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968                         } else {
969                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
970                         }
971                 }
972
973                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974                         /* for verify runs, we don't update uuids here,
975                          * so there would be nothing to report. */
976                         drbd_uuid_set_bm(device, 0UL);
977                         drbd_print_uuids(device, "updated UUIDs");
978                         if (device->p_uuid) {
979                                 /* Now the two UUID sets are equal, update what we
980                                  * know of the peer. */
981                                 int i;
982                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983                                         device->p_uuid[i] = device->ldev->md.uuid[i];
984                         }
985                 }
986         }
987
988         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
989 out_unlock:
990         spin_unlock_irq(&device->resource->req_lock);
991
992         /* If we have been sync source, and have an effective fencing-policy,
993          * once *all* volumes are back in sync, call "unfence". */
994         if (os.conn == C_SYNC_SOURCE) {
995                 enum drbd_disk_state disk_state = D_MASK;
996                 enum drbd_disk_state pdsk_state = D_MASK;
997                 enum drbd_fencing_p fp = FP_DONT_CARE;
998
999                 rcu_read_lock();
1000                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001                 if (fp != FP_DONT_CARE) {
1002                         struct drbd_peer_device *peer_device;
1003                         int vnr;
1004                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005                                 struct drbd_device *device = peer_device->device;
1006                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008                         }
1009                 }
1010                 rcu_read_unlock();
1011                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012                         conn_khelper(connection, "unfence-peer");
1013         }
1014
1015         put_ldev(device);
1016 out:
1017         device->rs_total  = 0;
1018         device->rs_failed = 0;
1019         device->rs_paused = 0;
1020
1021         /* reset start sector, if we reached end of device */
1022         if (verify_done && device->ov_left == 0)
1023                 device->ov_start_sector = 0;
1024
1025         drbd_md_sync(device);
1026
1027         if (khelper_cmd)
1028                 drbd_khelper(device, khelper_cmd);
1029
1030         return 1;
1031 }
1032
1033 /* helper */
1034 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1035 {
1036         if (drbd_peer_req_has_active_page(peer_req)) {
1037                 /* This might happen if sendpage() has not finished */
1038                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1039                 atomic_add(i, &device->pp_in_use_by_net);
1040                 atomic_sub(i, &device->pp_in_use);
1041                 spin_lock_irq(&device->resource->req_lock);
1042                 list_add_tail(&peer_req->w.list, &device->net_ee);
1043                 spin_unlock_irq(&device->resource->req_lock);
1044                 wake_up(&drbd_pp_wait);
1045         } else
1046                 drbd_free_peer_req(device, peer_req);
1047 }
1048
1049 /**
1050  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1051  * @w:          work object.
1052  * @cancel:     The connection will be closed anyways
1053  */
1054 int w_e_end_data_req(struct drbd_work *w, int cancel)
1055 {
1056         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1057         struct drbd_peer_device *peer_device = peer_req->peer_device;
1058         struct drbd_device *device = peer_device->device;
1059         int err;
1060
1061         if (unlikely(cancel)) {
1062                 drbd_free_peer_req(device, peer_req);
1063                 dec_unacked(device);
1064                 return 0;
1065         }
1066
1067         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1068                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1069         } else {
1070                 if (__ratelimit(&drbd_ratelimit_state))
1071                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1072                             (unsigned long long)peer_req->i.sector);
1073
1074                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1075         }
1076
1077         dec_unacked(device);
1078
1079         move_to_net_ee_or_free(device, peer_req);
1080
1081         if (unlikely(err))
1082                 drbd_err(device, "drbd_send_block() failed\n");
1083         return err;
1084 }
1085
1086 static bool all_zero(struct drbd_peer_request *peer_req)
1087 {
1088         struct page *page = peer_req->pages;
1089         unsigned int len = peer_req->i.size;
1090
1091         page_chain_for_each(page) {
1092                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1093                 unsigned int i, words = l / sizeof(long);
1094                 unsigned long *d;
1095
1096                 d = kmap_atomic(page);
1097                 for (i = 0; i < words; i++) {
1098                         if (d[i]) {
1099                                 kunmap_atomic(d);
1100                                 return false;
1101                         }
1102                 }
1103                 kunmap_atomic(d);
1104                 len -= l;
1105         }
1106
1107         return true;
1108 }
1109
1110 /**
1111  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1112  * @w:          work object.
1113  * @cancel:     The connection will be closed anyways
1114  */
1115 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1116 {
1117         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118         struct drbd_peer_device *peer_device = peer_req->peer_device;
1119         struct drbd_device *device = peer_device->device;
1120         int err;
1121
1122         if (unlikely(cancel)) {
1123                 drbd_free_peer_req(device, peer_req);
1124                 dec_unacked(device);
1125                 return 0;
1126         }
1127
1128         if (get_ldev_if_state(device, D_FAILED)) {
1129                 drbd_rs_complete_io(device, peer_req->i.sector);
1130                 put_ldev(device);
1131         }
1132
1133         if (device->state.conn == C_AHEAD) {
1134                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1135         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1136                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1137                         inc_rs_pending(device);
1138                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1139                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1140                         else
1141                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142                 } else {
1143                         if (__ratelimit(&drbd_ratelimit_state))
1144                                 drbd_err(device, "Not sending RSDataReply, "
1145                                     "partner DISKLESS!\n");
1146                         err = 0;
1147                 }
1148         } else {
1149                 if (__ratelimit(&drbd_ratelimit_state))
1150                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1151                             (unsigned long long)peer_req->i.sector);
1152
1153                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154
1155                 /* update resync data with failure */
1156                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1157         }
1158
1159         dec_unacked(device);
1160
1161         move_to_net_ee_or_free(device, peer_req);
1162
1163         if (unlikely(err))
1164                 drbd_err(device, "drbd_send_block() failed\n");
1165         return err;
1166 }
1167
1168 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1169 {
1170         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1171         struct drbd_peer_device *peer_device = peer_req->peer_device;
1172         struct drbd_device *device = peer_device->device;
1173         struct digest_info *di;
1174         int digest_size;
1175         void *digest = NULL;
1176         int err, eq = 0;
1177
1178         if (unlikely(cancel)) {
1179                 drbd_free_peer_req(device, peer_req);
1180                 dec_unacked(device);
1181                 return 0;
1182         }
1183
1184         if (get_ldev(device)) {
1185                 drbd_rs_complete_io(device, peer_req->i.sector);
1186                 put_ldev(device);
1187         }
1188
1189         di = peer_req->digest;
1190
1191         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1192                 /* quick hack to try to avoid a race against reconfiguration.
1193                  * a real fix would be much more involved,
1194                  * introducing more locking mechanisms */
1195                 if (peer_device->connection->csums_tfm) {
1196                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1197                         D_ASSERT(device, digest_size == di->digest_size);
1198                         digest = kmalloc(digest_size, GFP_NOIO);
1199                 }
1200                 if (digest) {
1201                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1202                         eq = !memcmp(digest, di->digest, digest_size);
1203                         kfree(digest);
1204                 }
1205
1206                 if (eq) {
1207                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1208                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1209                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1210                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1211                 } else {
1212                         inc_rs_pending(device);
1213                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1214                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1215                         kfree(di);
1216                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1217                 }
1218         } else {
1219                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1220                 if (__ratelimit(&drbd_ratelimit_state))
1221                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1222         }
1223
1224         dec_unacked(device);
1225         move_to_net_ee_or_free(device, peer_req);
1226
1227         if (unlikely(err))
1228                 drbd_err(device, "drbd_send_block/ack() failed\n");
1229         return err;
1230 }
1231
1232 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1233 {
1234         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1235         struct drbd_peer_device *peer_device = peer_req->peer_device;
1236         struct drbd_device *device = peer_device->device;
1237         sector_t sector = peer_req->i.sector;
1238         unsigned int size = peer_req->i.size;
1239         int digest_size;
1240         void *digest;
1241         int err = 0;
1242
1243         if (unlikely(cancel))
1244                 goto out;
1245
1246         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1247         digest = kmalloc(digest_size, GFP_NOIO);
1248         if (!digest) {
1249                 err = 1;        /* terminate the connection in case the allocation failed */
1250                 goto out;
1251         }
1252
1253         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1254                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1255         else
1256                 memset(digest, 0, digest_size);
1257
1258         /* Free e and pages before send.
1259          * In case we block on congestion, we could otherwise run into
1260          * some distributed deadlock, if the other side blocks on
1261          * congestion as well, because our receiver blocks in
1262          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1263         drbd_free_peer_req(device, peer_req);
1264         peer_req = NULL;
1265         inc_rs_pending(device);
1266         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1267         if (err)
1268                 dec_rs_pending(device);
1269         kfree(digest);
1270
1271 out:
1272         if (peer_req)
1273                 drbd_free_peer_req(device, peer_req);
1274         dec_unacked(device);
1275         return err;
1276 }
1277
1278 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1279 {
1280         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281                 device->ov_last_oos_size += size>>9;
1282         } else {
1283                 device->ov_last_oos_start = sector;
1284                 device->ov_last_oos_size = size>>9;
1285         }
1286         drbd_set_out_of_sync(device, sector, size);
1287 }
1288
1289 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290 {
1291         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292         struct drbd_peer_device *peer_device = peer_req->peer_device;
1293         struct drbd_device *device = peer_device->device;
1294         struct digest_info *di;
1295         void *digest;
1296         sector_t sector = peer_req->i.sector;
1297         unsigned int size = peer_req->i.size;
1298         int digest_size;
1299         int err, eq = 0;
1300         bool stop_sector_reached = false;
1301
1302         if (unlikely(cancel)) {
1303                 drbd_free_peer_req(device, peer_req);
1304                 dec_unacked(device);
1305                 return 0;
1306         }
1307
1308         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309          * the resync lru has been cleaned up already */
1310         if (get_ldev(device)) {
1311                 drbd_rs_complete_io(device, peer_req->i.sector);
1312                 put_ldev(device);
1313         }
1314
1315         di = peer_req->digest;
1316
1317         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319                 digest = kmalloc(digest_size, GFP_NOIO);
1320                 if (digest) {
1321                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323                         D_ASSERT(device, digest_size == di->digest_size);
1324                         eq = !memcmp(digest, di->digest, digest_size);
1325                         kfree(digest);
1326                 }
1327         }
1328
1329         /* Free peer_req and pages before send.
1330          * In case we block on congestion, we could otherwise run into
1331          * some distributed deadlock, if the other side blocks on
1332          * congestion as well, because our receiver blocks in
1333          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334         drbd_free_peer_req(device, peer_req);
1335         if (!eq)
1336                 drbd_ov_out_of_sync_found(device, sector, size);
1337         else
1338                 ov_out_of_sync_print(device);
1339
1340         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343         dec_unacked(device);
1344
1345         --device->ov_left;
1346
1347         /* let's advance progress step marks only for every other megabyte */
1348         if ((device->ov_left & 0x200) == 0x200)
1349                 drbd_advance_rs_marks(device, device->ov_left);
1350
1351         stop_sector_reached = verify_can_do_stop_sector(device) &&
1352                 (sector + (size>>9)) >= device->ov_stop_sector;
1353
1354         if (device->ov_left == 0 || stop_sector_reached) {
1355                 ov_out_of_sync_print(device);
1356                 drbd_resync_finished(device);
1357         }
1358
1359         return err;
1360 }
1361
1362 /* FIXME
1363  * We need to track the number of pending barrier acks,
1364  * and to be able to wait for them.
1365  * See also comment in drbd_adm_attach before drbd_suspend_io.
1366  */
1367 static int drbd_send_barrier(struct drbd_connection *connection)
1368 {
1369         struct p_barrier *p;
1370         struct drbd_socket *sock;
1371
1372         sock = &connection->data;
1373         p = conn_prepare_command(connection, sock);
1374         if (!p)
1375                 return -EIO;
1376         p->barrier = connection->send.current_epoch_nr;
1377         p->pad = 0;
1378         connection->send.current_epoch_writes = 0;
1379         connection->send.last_sent_barrier_jif = jiffies;
1380
1381         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382 }
1383
1384 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385 {
1386         struct drbd_socket *sock = &pd->connection->data;
1387         if (!drbd_prepare_command(pd, sock))
1388                 return -EIO;
1389         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390 }
1391
1392 int w_send_write_hint(struct drbd_work *w, int cancel)
1393 {
1394         struct drbd_device *device =
1395                 container_of(w, struct drbd_device, unplug_work);
1396
1397         if (cancel)
1398                 return 0;
1399         return pd_send_unplug_remote(first_peer_device(device));
1400 }
1401
1402 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403 {
1404         if (!connection->send.seen_any_write_yet) {
1405                 connection->send.seen_any_write_yet = true;
1406                 connection->send.current_epoch_nr = epoch;
1407                 connection->send.current_epoch_writes = 0;
1408                 connection->send.last_sent_barrier_jif = jiffies;
1409         }
1410 }
1411
1412 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413 {
1414         /* re-init if first write on this connection */
1415         if (!connection->send.seen_any_write_yet)
1416                 return;
1417         if (connection->send.current_epoch_nr != epoch) {
1418                 if (connection->send.current_epoch_writes)
1419                         drbd_send_barrier(connection);
1420                 connection->send.current_epoch_nr = epoch;
1421         }
1422 }
1423
1424 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425 {
1426         struct drbd_request *req = container_of(w, struct drbd_request, w);
1427         struct drbd_device *device = req->device;
1428         struct drbd_peer_device *const peer_device = first_peer_device(device);
1429         struct drbd_connection *const connection = peer_device->connection;
1430         int err;
1431
1432         if (unlikely(cancel)) {
1433                 req_mod(req, SEND_CANCELED);
1434                 return 0;
1435         }
1436         req->pre_send_jif = jiffies;
1437
1438         /* this time, no connection->send.current_epoch_writes++;
1439          * If it was sent, it was the closing barrier for the last
1440          * replicated epoch, before we went into AHEAD mode.
1441          * No more barriers will be sent, until we leave AHEAD mode again. */
1442         maybe_send_barrier(connection, req->epoch);
1443
1444         err = drbd_send_out_of_sync(peer_device, req);
1445         req_mod(req, OOS_HANDED_TO_NETWORK);
1446
1447         return err;
1448 }
1449
1450 /**
1451  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452  * @w:          work object.
1453  * @cancel:     The connection will be closed anyways
1454  */
1455 int w_send_dblock(struct drbd_work *w, int cancel)
1456 {
1457         struct drbd_request *req = container_of(w, struct drbd_request, w);
1458         struct drbd_device *device = req->device;
1459         struct drbd_peer_device *const peer_device = first_peer_device(device);
1460         struct drbd_connection *connection = peer_device->connection;
1461         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462         int err;
1463
1464         if (unlikely(cancel)) {
1465                 req_mod(req, SEND_CANCELED);
1466                 return 0;
1467         }
1468         req->pre_send_jif = jiffies;
1469
1470         re_init_if_first_write(connection, req->epoch);
1471         maybe_send_barrier(connection, req->epoch);
1472         connection->send.current_epoch_writes++;
1473
1474         err = drbd_send_dblock(peer_device, req);
1475         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1476
1477         if (do_send_unplug && !err)
1478                 pd_send_unplug_remote(peer_device);
1479
1480         return err;
1481 }
1482
1483 /**
1484  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485  * @w:          work object.
1486  * @cancel:     The connection will be closed anyways
1487  */
1488 int w_send_read_req(struct drbd_work *w, int cancel)
1489 {
1490         struct drbd_request *req = container_of(w, struct drbd_request, w);
1491         struct drbd_device *device = req->device;
1492         struct drbd_peer_device *const peer_device = first_peer_device(device);
1493         struct drbd_connection *connection = peer_device->connection;
1494         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495         int err;
1496
1497         if (unlikely(cancel)) {
1498                 req_mod(req, SEND_CANCELED);
1499                 return 0;
1500         }
1501         req->pre_send_jif = jiffies;
1502
1503         /* Even read requests may close a write epoch,
1504          * if there was any yet. */
1505         maybe_send_barrier(connection, req->epoch);
1506
1507         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508                                  (unsigned long)req);
1509
1510         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1511
1512         if (do_send_unplug && !err)
1513                 pd_send_unplug_remote(peer_device);
1514
1515         return err;
1516 }
1517
1518 int w_restart_disk_io(struct drbd_work *w, int cancel)
1519 {
1520         struct drbd_request *req = container_of(w, struct drbd_request, w);
1521         struct drbd_device *device = req->device;
1522
1523         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524                 drbd_al_begin_io(device, &req->i);
1525
1526         req->private_bio = bio_clone_fast(req->master_bio, GFP_NOIO,
1527                                           &drbd_io_bio_set);
1528         bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1529         req->private_bio->bi_private = req;
1530         req->private_bio->bi_end_io = drbd_request_endio;
1531         submit_bio_noacct(req->private_bio);
1532
1533         return 0;
1534 }
1535
1536 static int _drbd_may_sync_now(struct drbd_device *device)
1537 {
1538         struct drbd_device *odev = device;
1539         int resync_after;
1540
1541         while (1) {
1542                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1543                         return 1;
1544                 rcu_read_lock();
1545                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1546                 rcu_read_unlock();
1547                 if (resync_after == -1)
1548                         return 1;
1549                 odev = minor_to_device(resync_after);
1550                 if (!odev)
1551                         return 1;
1552                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1553                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1554                     odev->state.aftr_isp || odev->state.peer_isp ||
1555                     odev->state.user_isp)
1556                         return 0;
1557         }
1558 }
1559
1560 /**
1561  * drbd_pause_after() - Pause resync on all devices that may not resync now
1562  * @device:     DRBD device.
1563  *
1564  * Called from process context only (admin command and after_state_ch).
1565  */
1566 static bool drbd_pause_after(struct drbd_device *device)
1567 {
1568         bool changed = false;
1569         struct drbd_device *odev;
1570         int i;
1571
1572         rcu_read_lock();
1573         idr_for_each_entry(&drbd_devices, odev, i) {
1574                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1575                         continue;
1576                 if (!_drbd_may_sync_now(odev) &&
1577                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1578                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1579                         changed = true;
1580         }
1581         rcu_read_unlock();
1582
1583         return changed;
1584 }
1585
1586 /**
1587  * drbd_resume_next() - Resume resync on all devices that may resync now
1588  * @device:     DRBD device.
1589  *
1590  * Called from process context only (admin command and worker).
1591  */
1592 static bool drbd_resume_next(struct drbd_device *device)
1593 {
1594         bool changed = false;
1595         struct drbd_device *odev;
1596         int i;
1597
1598         rcu_read_lock();
1599         idr_for_each_entry(&drbd_devices, odev, i) {
1600                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1601                         continue;
1602                 if (odev->state.aftr_isp) {
1603                         if (_drbd_may_sync_now(odev) &&
1604                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1605                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1606                                 changed = true;
1607                 }
1608         }
1609         rcu_read_unlock();
1610         return changed;
1611 }
1612
1613 void resume_next_sg(struct drbd_device *device)
1614 {
1615         lock_all_resources();
1616         drbd_resume_next(device);
1617         unlock_all_resources();
1618 }
1619
1620 void suspend_other_sg(struct drbd_device *device)
1621 {
1622         lock_all_resources();
1623         drbd_pause_after(device);
1624         unlock_all_resources();
1625 }
1626
1627 /* caller must lock_all_resources() */
1628 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1629 {
1630         struct drbd_device *odev;
1631         int resync_after;
1632
1633         if (o_minor == -1)
1634                 return NO_ERROR;
1635         if (o_minor < -1 || o_minor > MINORMASK)
1636                 return ERR_RESYNC_AFTER;
1637
1638         /* check for loops */
1639         odev = minor_to_device(o_minor);
1640         while (1) {
1641                 if (odev == device)
1642                         return ERR_RESYNC_AFTER_CYCLE;
1643
1644                 /* You are free to depend on diskless, non-existing,
1645                  * or not yet/no longer existing minors.
1646                  * We only reject dependency loops.
1647                  * We cannot follow the dependency chain beyond a detached or
1648                  * missing minor.
1649                  */
1650                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1651                         return NO_ERROR;
1652
1653                 rcu_read_lock();
1654                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1655                 rcu_read_unlock();
1656                 /* dependency chain ends here, no cycles. */
1657                 if (resync_after == -1)
1658                         return NO_ERROR;
1659
1660                 /* follow the dependency chain */
1661                 odev = minor_to_device(resync_after);
1662         }
1663 }
1664
1665 /* caller must lock_all_resources() */
1666 void drbd_resync_after_changed(struct drbd_device *device)
1667 {
1668         int changed;
1669
1670         do {
1671                 changed  = drbd_pause_after(device);
1672                 changed |= drbd_resume_next(device);
1673         } while (changed);
1674 }
1675
1676 void drbd_rs_controller_reset(struct drbd_device *device)
1677 {
1678         struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1679         struct fifo_buffer *plan;
1680
1681         atomic_set(&device->rs_sect_in, 0);
1682         atomic_set(&device->rs_sect_ev, 0);
1683         device->rs_in_flight = 0;
1684         device->rs_last_events =
1685                 (int)part_stat_read_accum(disk->part0, sectors);
1686
1687         /* Updating the RCU protected object in place is necessary since
1688            this function gets called from atomic context.
1689            It is valid since all other updates also lead to an completely
1690            empty fifo */
1691         rcu_read_lock();
1692         plan = rcu_dereference(device->rs_plan_s);
1693         plan->total = 0;
1694         fifo_set(plan, 0);
1695         rcu_read_unlock();
1696 }
1697
1698 void start_resync_timer_fn(struct timer_list *t)
1699 {
1700         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1701         drbd_device_post_work(device, RS_START);
1702 }
1703
1704 static void do_start_resync(struct drbd_device *device)
1705 {
1706         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1707                 drbd_warn(device, "postponing start_resync ...\n");
1708                 device->start_resync_timer.expires = jiffies + HZ/10;
1709                 add_timer(&device->start_resync_timer);
1710                 return;
1711         }
1712
1713         drbd_start_resync(device, C_SYNC_SOURCE);
1714         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1715 }
1716
1717 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1718 {
1719         bool csums_after_crash_only;
1720         rcu_read_lock();
1721         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1722         rcu_read_unlock();
1723         return connection->agreed_pro_version >= 89 &&          /* supported? */
1724                 connection->csums_tfm &&                        /* configured? */
1725                 (csums_after_crash_only == false                /* use for each resync? */
1726                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1727 }
1728
1729 /**
1730  * drbd_start_resync() - Start the resync process
1731  * @device:     DRBD device.
1732  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1733  *
1734  * This function might bring you directly into one of the
1735  * C_PAUSED_SYNC_* states.
1736  */
1737 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1738 {
1739         struct drbd_peer_device *peer_device = first_peer_device(device);
1740         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1741         union drbd_state ns;
1742         int r;
1743
1744         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1745                 drbd_err(device, "Resync already running!\n");
1746                 return;
1747         }
1748
1749         if (!connection) {
1750                 drbd_err(device, "No connection to peer, aborting!\n");
1751                 return;
1752         }
1753
1754         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1755                 if (side == C_SYNC_TARGET) {
1756                         /* Since application IO was locked out during C_WF_BITMAP_T and
1757                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1758                            we check that we might make the data inconsistent. */
1759                         r = drbd_khelper(device, "before-resync-target");
1760                         r = (r >> 8) & 0xff;
1761                         if (r > 0) {
1762                                 drbd_info(device, "before-resync-target handler returned %d, "
1763                                          "dropping connection.\n", r);
1764                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1765                                 return;
1766                         }
1767                 } else /* C_SYNC_SOURCE */ {
1768                         r = drbd_khelper(device, "before-resync-source");
1769                         r = (r >> 8) & 0xff;
1770                         if (r > 0) {
1771                                 if (r == 3) {
1772                                         drbd_info(device, "before-resync-source handler returned %d, "
1773                                                  "ignoring. Old userland tools?", r);
1774                                 } else {
1775                                         drbd_info(device, "before-resync-source handler returned %d, "
1776                                                  "dropping connection.\n", r);
1777                                         conn_request_state(connection,
1778                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1779                                         return;
1780                                 }
1781                         }
1782                 }
1783         }
1784
1785         if (current == connection->worker.task) {
1786                 /* The worker should not sleep waiting for state_mutex,
1787                    that can take long */
1788                 if (!mutex_trylock(device->state_mutex)) {
1789                         set_bit(B_RS_H_DONE, &device->flags);
1790                         device->start_resync_timer.expires = jiffies + HZ/5;
1791                         add_timer(&device->start_resync_timer);
1792                         return;
1793                 }
1794         } else {
1795                 mutex_lock(device->state_mutex);
1796         }
1797
1798         lock_all_resources();
1799         clear_bit(B_RS_H_DONE, &device->flags);
1800         /* Did some connection breakage or IO error race with us? */
1801         if (device->state.conn < C_CONNECTED
1802         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1803                 unlock_all_resources();
1804                 goto out;
1805         }
1806
1807         ns = drbd_read_state(device);
1808
1809         ns.aftr_isp = !_drbd_may_sync_now(device);
1810
1811         ns.conn = side;
1812
1813         if (side == C_SYNC_TARGET)
1814                 ns.disk = D_INCONSISTENT;
1815         else /* side == C_SYNC_SOURCE */
1816                 ns.pdsk = D_INCONSISTENT;
1817
1818         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1819         ns = drbd_read_state(device);
1820
1821         if (ns.conn < C_CONNECTED)
1822                 r = SS_UNKNOWN_ERROR;
1823
1824         if (r == SS_SUCCESS) {
1825                 unsigned long tw = drbd_bm_total_weight(device);
1826                 unsigned long now = jiffies;
1827                 int i;
1828
1829                 device->rs_failed    = 0;
1830                 device->rs_paused    = 0;
1831                 device->rs_same_csum = 0;
1832                 device->rs_last_sect_ev = 0;
1833                 device->rs_total     = tw;
1834                 device->rs_start     = now;
1835                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1836                         device->rs_mark_left[i] = tw;
1837                         device->rs_mark_time[i] = now;
1838                 }
1839                 drbd_pause_after(device);
1840                 /* Forget potentially stale cached per resync extent bit-counts.
1841                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1842                  * disabled, and know the disk state is ok. */
1843                 spin_lock(&device->al_lock);
1844                 lc_reset(device->resync);
1845                 device->resync_locked = 0;
1846                 device->resync_wenr = LC_FREE;
1847                 spin_unlock(&device->al_lock);
1848         }
1849         unlock_all_resources();
1850
1851         if (r == SS_SUCCESS) {
1852                 wake_up(&device->al_wait); /* for lc_reset() above */
1853                 /* reset rs_last_bcast when a resync or verify is started,
1854                  * to deal with potential jiffies wrap. */
1855                 device->rs_last_bcast = jiffies - HZ;
1856
1857                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1858                      drbd_conn_str(ns.conn),
1859                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1860                      (unsigned long) device->rs_total);
1861                 if (side == C_SYNC_TARGET) {
1862                         device->bm_resync_fo = 0;
1863                         device->use_csums = use_checksum_based_resync(connection, device);
1864                 } else {
1865                         device->use_csums = false;
1866                 }
1867
1868                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1869                  * with w_send_oos, or the sync target will get confused as to
1870                  * how much bits to resync.  We cannot do that always, because for an
1871                  * empty resync and protocol < 95, we need to do it here, as we call
1872                  * drbd_resync_finished from here in that case.
1873                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1874                  * and from after_state_ch otherwise. */
1875                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1876                         drbd_gen_and_send_sync_uuid(peer_device);
1877
1878                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1879                         /* This still has a race (about when exactly the peers
1880                          * detect connection loss) that can lead to a full sync
1881                          * on next handshake. In 8.3.9 we fixed this with explicit
1882                          * resync-finished notifications, but the fix
1883                          * introduces a protocol change.  Sleeping for some
1884                          * time longer than the ping interval + timeout on the
1885                          * SyncSource, to give the SyncTarget the chance to
1886                          * detect connection loss, then waiting for a ping
1887                          * response (implicit in drbd_resync_finished) reduces
1888                          * the race considerably, but does not solve it. */
1889                         if (side == C_SYNC_SOURCE) {
1890                                 struct net_conf *nc;
1891                                 int timeo;
1892
1893                                 rcu_read_lock();
1894                                 nc = rcu_dereference(connection->net_conf);
1895                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1896                                 rcu_read_unlock();
1897                                 schedule_timeout_interruptible(timeo);
1898                         }
1899                         drbd_resync_finished(device);
1900                 }
1901
1902                 drbd_rs_controller_reset(device);
1903                 /* ns.conn may already be != device->state.conn,
1904                  * we may have been paused in between, or become paused until
1905                  * the timer triggers.
1906                  * No matter, that is handled in resync_timer_fn() */
1907                 if (ns.conn == C_SYNC_TARGET)
1908                         mod_timer(&device->resync_timer, jiffies);
1909
1910                 drbd_md_sync(device);
1911         }
1912         put_ldev(device);
1913 out:
1914         mutex_unlock(device->state_mutex);
1915 }
1916
1917 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1918 {
1919         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1920         device->rs_last_bcast = jiffies;
1921
1922         if (!get_ldev(device))
1923                 return;
1924
1925         drbd_bm_write_lazy(device, 0);
1926         if (resync_done && is_sync_state(device->state.conn))
1927                 drbd_resync_finished(device);
1928
1929         drbd_bcast_event(device, &sib);
1930         /* update timestamp, in case it took a while to write out stuff */
1931         device->rs_last_bcast = jiffies;
1932         put_ldev(device);
1933 }
1934
1935 static void drbd_ldev_destroy(struct drbd_device *device)
1936 {
1937         lc_destroy(device->resync);
1938         device->resync = NULL;
1939         lc_destroy(device->act_log);
1940         device->act_log = NULL;
1941
1942         __acquire(local);
1943         drbd_backing_dev_free(device, device->ldev);
1944         device->ldev = NULL;
1945         __release(local);
1946
1947         clear_bit(GOING_DISKLESS, &device->flags);
1948         wake_up(&device->misc_wait);
1949 }
1950
1951 static void go_diskless(struct drbd_device *device)
1952 {
1953         D_ASSERT(device, device->state.disk == D_FAILED);
1954         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1955          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1956          * the protected members anymore, though, so once put_ldev reaches zero
1957          * again, it will be safe to free them. */
1958
1959         /* Try to write changed bitmap pages, read errors may have just
1960          * set some bits outside the area covered by the activity log.
1961          *
1962          * If we have an IO error during the bitmap writeout,
1963          * we will want a full sync next time, just in case.
1964          * (Do we want a specific meta data flag for this?)
1965          *
1966          * If that does not make it to stable storage either,
1967          * we cannot do anything about that anymore.
1968          *
1969          * We still need to check if both bitmap and ldev are present, we may
1970          * end up here after a failed attach, before ldev was even assigned.
1971          */
1972         if (device->bitmap && device->ldev) {
1973                 /* An interrupted resync or similar is allowed to recounts bits
1974                  * while we detach.
1975                  * Any modifications would not be expected anymore, though.
1976                  */
1977                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1978                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1979                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1980                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1981                                 drbd_md_sync(device);
1982                         }
1983                 }
1984         }
1985
1986         drbd_force_state(device, NS(disk, D_DISKLESS));
1987 }
1988
1989 static int do_md_sync(struct drbd_device *device)
1990 {
1991         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1992         drbd_md_sync(device);
1993         return 0;
1994 }
1995
1996 /* only called from drbd_worker thread, no locking */
1997 void __update_timing_details(
1998                 struct drbd_thread_timing_details *tdp,
1999                 unsigned int *cb_nr,
2000                 void *cb,
2001                 const char *fn, const unsigned int line)
2002 {
2003         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2004         struct drbd_thread_timing_details *td = tdp + i;
2005
2006         td->start_jif = jiffies;
2007         td->cb_addr = cb;
2008         td->caller_fn = fn;
2009         td->line = line;
2010         td->cb_nr = *cb_nr;
2011
2012         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2013         td = tdp + i;
2014         memset(td, 0, sizeof(*td));
2015
2016         ++(*cb_nr);
2017 }
2018
2019 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2020 {
2021         if (test_bit(MD_SYNC, &todo))
2022                 do_md_sync(device);
2023         if (test_bit(RS_DONE, &todo) ||
2024             test_bit(RS_PROGRESS, &todo))
2025                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2026         if (test_bit(GO_DISKLESS, &todo))
2027                 go_diskless(device);
2028         if (test_bit(DESTROY_DISK, &todo))
2029                 drbd_ldev_destroy(device);
2030         if (test_bit(RS_START, &todo))
2031                 do_start_resync(device);
2032 }
2033
2034 #define DRBD_DEVICE_WORK_MASK   \
2035         ((1UL << GO_DISKLESS)   \
2036         |(1UL << DESTROY_DISK)  \
2037         |(1UL << MD_SYNC)       \
2038         |(1UL << RS_START)      \
2039         |(1UL << RS_PROGRESS)   \
2040         |(1UL << RS_DONE)       \
2041         )
2042
2043 static unsigned long get_work_bits(unsigned long *flags)
2044 {
2045         unsigned long old, new;
2046         do {
2047                 old = *flags;
2048                 new = old & ~DRBD_DEVICE_WORK_MASK;
2049         } while (cmpxchg(flags, old, new) != old);
2050         return old & DRBD_DEVICE_WORK_MASK;
2051 }
2052
2053 static void do_unqueued_work(struct drbd_connection *connection)
2054 {
2055         struct drbd_peer_device *peer_device;
2056         int vnr;
2057
2058         rcu_read_lock();
2059         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2060                 struct drbd_device *device = peer_device->device;
2061                 unsigned long todo = get_work_bits(&device->flags);
2062                 if (!todo)
2063                         continue;
2064
2065                 kref_get(&device->kref);
2066                 rcu_read_unlock();
2067                 do_device_work(device, todo);
2068                 kref_put(&device->kref, drbd_destroy_device);
2069                 rcu_read_lock();
2070         }
2071         rcu_read_unlock();
2072 }
2073
2074 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2075 {
2076         spin_lock_irq(&queue->q_lock);
2077         list_splice_tail_init(&queue->q, work_list);
2078         spin_unlock_irq(&queue->q_lock);
2079         return !list_empty(work_list);
2080 }
2081
2082 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2083 {
2084         DEFINE_WAIT(wait);
2085         struct net_conf *nc;
2086         int uncork, cork;
2087
2088         dequeue_work_batch(&connection->sender_work, work_list);
2089         if (!list_empty(work_list))
2090                 return;
2091
2092         /* Still nothing to do?
2093          * Maybe we still need to close the current epoch,
2094          * even if no new requests are queued yet.
2095          *
2096          * Also, poke TCP, just in case.
2097          * Then wait for new work (or signal). */
2098         rcu_read_lock();
2099         nc = rcu_dereference(connection->net_conf);
2100         uncork = nc ? nc->tcp_cork : 0;
2101         rcu_read_unlock();
2102         if (uncork) {
2103                 mutex_lock(&connection->data.mutex);
2104                 if (connection->data.socket)
2105                         tcp_sock_set_cork(connection->data.socket->sk, false);
2106                 mutex_unlock(&connection->data.mutex);
2107         }
2108
2109         for (;;) {
2110                 int send_barrier;
2111                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2112                 spin_lock_irq(&connection->resource->req_lock);
2113                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2114                 if (!list_empty(&connection->sender_work.q))
2115                         list_splice_tail_init(&connection->sender_work.q, work_list);
2116                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2117                 if (!list_empty(work_list) || signal_pending(current)) {
2118                         spin_unlock_irq(&connection->resource->req_lock);
2119                         break;
2120                 }
2121
2122                 /* We found nothing new to do, no to-be-communicated request,
2123                  * no other work item.  We may still need to close the last
2124                  * epoch.  Next incoming request epoch will be connection ->
2125                  * current transfer log epoch number.  If that is different
2126                  * from the epoch of the last request we communicated, it is
2127                  * safe to send the epoch separating barrier now.
2128                  */
2129                 send_barrier =
2130                         atomic_read(&connection->current_tle_nr) !=
2131                         connection->send.current_epoch_nr;
2132                 spin_unlock_irq(&connection->resource->req_lock);
2133
2134                 if (send_barrier)
2135                         maybe_send_barrier(connection,
2136                                         connection->send.current_epoch_nr + 1);
2137
2138                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2139                         break;
2140
2141                 /* drbd_send() may have called flush_signals() */
2142                 if (get_t_state(&connection->worker) != RUNNING)
2143                         break;
2144
2145                 schedule();
2146                 /* may be woken up for other things but new work, too,
2147                  * e.g. if the current epoch got closed.
2148                  * In which case we send the barrier above. */
2149         }
2150         finish_wait(&connection->sender_work.q_wait, &wait);
2151
2152         /* someone may have changed the config while we have been waiting above. */
2153         rcu_read_lock();
2154         nc = rcu_dereference(connection->net_conf);
2155         cork = nc ? nc->tcp_cork : 0;
2156         rcu_read_unlock();
2157         mutex_lock(&connection->data.mutex);
2158         if (connection->data.socket) {
2159                 if (cork)
2160                         tcp_sock_set_cork(connection->data.socket->sk, true);
2161                 else if (!uncork)
2162                         tcp_sock_set_cork(connection->data.socket->sk, false);
2163         }
2164         mutex_unlock(&connection->data.mutex);
2165 }
2166
2167 int drbd_worker(struct drbd_thread *thi)
2168 {
2169         struct drbd_connection *connection = thi->connection;
2170         struct drbd_work *w = NULL;
2171         struct drbd_peer_device *peer_device;
2172         LIST_HEAD(work_list);
2173         int vnr;
2174
2175         while (get_t_state(thi) == RUNNING) {
2176                 drbd_thread_current_set_cpu(thi);
2177
2178                 if (list_empty(&work_list)) {
2179                         update_worker_timing_details(connection, wait_for_work);
2180                         wait_for_work(connection, &work_list);
2181                 }
2182
2183                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2184                         update_worker_timing_details(connection, do_unqueued_work);
2185                         do_unqueued_work(connection);
2186                 }
2187
2188                 if (signal_pending(current)) {
2189                         flush_signals(current);
2190                         if (get_t_state(thi) == RUNNING) {
2191                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2192                                 continue;
2193                         }
2194                         break;
2195                 }
2196
2197                 if (get_t_state(thi) != RUNNING)
2198                         break;
2199
2200                 if (!list_empty(&work_list)) {
2201                         w = list_first_entry(&work_list, struct drbd_work, list);
2202                         list_del_init(&w->list);
2203                         update_worker_timing_details(connection, w->cb);
2204                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2205                                 continue;
2206                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2207                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2208                 }
2209         }
2210
2211         do {
2212                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2213                         update_worker_timing_details(connection, do_unqueued_work);
2214                         do_unqueued_work(connection);
2215                 }
2216                 if (!list_empty(&work_list)) {
2217                         w = list_first_entry(&work_list, struct drbd_work, list);
2218                         list_del_init(&w->list);
2219                         update_worker_timing_details(connection, w->cb);
2220                         w->cb(w, 1);
2221                 } else
2222                         dequeue_work_batch(&connection->sender_work, &work_list);
2223         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2224
2225         rcu_read_lock();
2226         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2227                 struct drbd_device *device = peer_device->device;
2228                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2229                 kref_get(&device->kref);
2230                 rcu_read_unlock();
2231                 drbd_device_cleanup(device);
2232                 kref_put(&device->kref, drbd_destroy_device);
2233                 rcu_read_lock();
2234         }
2235         rcu_read_unlock();
2236
2237         return 0;
2238 }