Merge tag 'v5.11' into next
[linux-2.6-microblaze.git] / drivers / block / drbd / drbd_worker.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_worker.c
4
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12 */
13
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30
31 static int make_ov_request(struct drbd_device *, int);
32 static int make_resync_request(struct drbd_device *, int);
33
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
50 void drbd_md_endio(struct bio *bio)
51 {
52         struct drbd_device *device;
53
54         device = bio->bi_private;
55         device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57         /* special case: drbd_md_read() during drbd_adm_attach() */
58         if (device->ldev)
59                 put_ldev(device);
60         bio_put(bio);
61
62         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63          * to timeout on the lower level device, and eventually detach from it.
64          * If this io completion runs after that timeout expired, this
65          * drbd_md_put_buffer() may allow us to finally try and re-attach.
66          * During normal operation, this only puts that extra reference
67          * down to 1 again.
68          * Make sure we first drop the reference, and only then signal
69          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70          * next drbd_md_sync_page_io(), that we trigger the
71          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72          */
73         drbd_md_put_buffer(device);
74         device->md_io.done = 1;
75         wake_up(&device->misc_wait);
76 }
77
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83         unsigned long flags = 0;
84         struct drbd_peer_device *peer_device = peer_req->peer_device;
85         struct drbd_device *device = peer_device->device;
86
87         spin_lock_irqsave(&device->resource->req_lock, flags);
88         device->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&device->read_ee))
91                 wake_up(&device->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
94         spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97         put_ldev(device);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_peer_device *peer_device = peer_req->peer_device;
106         struct drbd_device *device = peer_device->device;
107         struct drbd_connection *connection = peer_device->connection;
108         struct drbd_interval i;
109         int do_wake;
110         u64 block_id;
111         int do_al_complete_io;
112
113         /* after we moved peer_req to done_ee,
114          * we may no longer access it,
115          * it may be freed/reused already!
116          * (as soon as we release the req_lock) */
117         i = peer_req->i;
118         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119         block_id = peer_req->block_id;
120         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122         if (peer_req->flags & EE_WAS_ERROR) {
123                 /* In protocol != C, we usually do not send write acks.
124                  * In case of a write error, send the neg ack anyways. */
125                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126                         inc_unacked(device);
127                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
128         }
129
130         spin_lock_irqsave(&device->resource->req_lock, flags);
131         device->writ_cnt += peer_req->i.size >> 9;
132         list_move_tail(&peer_req->w.list, &device->done_ee);
133
134         /*
135          * Do not remove from the write_requests tree here: we did not send the
136          * Ack yet and did not wake possibly waiting conflicting requests.
137          * Removed from the tree from "drbd_process_done_ee" within the
138          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139          * _drbd_clear_done_ee.
140          */
141
142         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
145          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146         if (peer_req->flags & EE_WAS_ERROR)
147                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149         if (connection->cstate >= C_WF_REPORT_PARAMS) {
150                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152                         kref_put(&device->kref, drbd_destroy_device);
153         }
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         put_ldev(device);
166 }
167
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
171 void drbd_peer_request_endio(struct bio *bio)
172 {
173         struct drbd_peer_request *peer_req = bio->bi_private;
174         struct drbd_device *device = peer_req->peer_device->device;
175         bool is_write = bio_data_dir(bio) == WRITE;
176         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177                           bio_op(bio) == REQ_OP_DISCARD;
178
179         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_status,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_status)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 static void
198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201                 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_device *device = req->device;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213
214         /* If this request was aborted locally before,
215          * but now was completed "successfully",
216          * chances are that this caused arbitrary data corruption.
217          *
218          * "aborting" requests, or force-detaching the disk, is intended for
219          * completely blocked/hung local backing devices which do no longer
220          * complete requests at all, not even do error completions.  In this
221          * situation, usually a hard-reset and failover is the only way out.
222          *
223          * By "aborting", basically faking a local error-completion,
224          * we allow for a more graceful swichover by cleanly migrating services.
225          * Still the affected node has to be rebooted "soon".
226          *
227          * By completing these requests, we allow the upper layers to re-use
228          * the associated data pages.
229          *
230          * If later the local backing device "recovers", and now DMAs some data
231          * from disk into the original request pages, in the best case it will
232          * just put random data into unused pages; but typically it will corrupt
233          * meanwhile completely unrelated data, causing all sorts of damage.
234          *
235          * Which means delayed successful completion,
236          * especially for READ requests,
237          * is a reason to panic().
238          *
239          * We assume that a delayed *error* completion is OK,
240          * though we still will complain noisily about it.
241          */
242         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243                 if (__ratelimit(&drbd_ratelimit_state))
244                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246                 if (!bio->bi_status)
247                         drbd_panic_after_delayed_completion_of_aborted_request(device);
248         }
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(bio->bi_status)) {
252                 switch (bio_op(bio)) {
253                 case REQ_OP_WRITE_ZEROES:
254                 case REQ_OP_DISCARD:
255                         if (bio->bi_status == BLK_STS_NOTSUPP)
256                                 what = DISCARD_COMPLETED_NOTSUPP;
257                         else
258                                 what = DISCARD_COMPLETED_WITH_ERROR;
259                         break;
260                 case REQ_OP_READ:
261                         if (bio->bi_opf & REQ_RAHEAD)
262                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263                         else
264                                 what = READ_COMPLETED_WITH_ERROR;
265                         break;
266                 default:
267                         what = WRITE_COMPLETED_WITH_ERROR;
268                         break;
269                 }
270         } else {
271                 what = COMPLETED_OK;
272         }
273
274         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275         bio_put(bio);
276
277         /* not req_mod(), we need irqsave here! */
278         spin_lock_irqsave(&device->resource->req_lock, flags);
279         __req_mod(req, what, &m);
280         spin_unlock_irqrestore(&device->resource->req_lock, flags);
281         put_ldev(device);
282
283         if (m.bio)
284                 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289         SHASH_DESC_ON_STACK(desc, tfm);
290         struct page *page = peer_req->pages;
291         struct page *tmp;
292         unsigned len;
293         void *src;
294
295         desc->tfm = tfm;
296
297         crypto_shash_init(desc);
298
299         src = kmap_atomic(page);
300         while ((tmp = page_chain_next(page))) {
301                 /* all but the last page will be fully used */
302                 crypto_shash_update(desc, src, PAGE_SIZE);
303                 kunmap_atomic(src);
304                 page = tmp;
305                 src = kmap_atomic(page);
306         }
307         /* and now the last, possibly only partially used page */
308         len = peer_req->i.size & (PAGE_SIZE - 1);
309         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310         kunmap_atomic(src);
311
312         crypto_shash_final(desc, digest);
313         shash_desc_zero(desc);
314 }
315
316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318         SHASH_DESC_ON_STACK(desc, tfm);
319         struct bio_vec bvec;
320         struct bvec_iter iter;
321
322         desc->tfm = tfm;
323
324         crypto_shash_init(desc);
325
326         bio_for_each_segment(bvec, bio, iter) {
327                 u8 *src;
328
329                 src = kmap_atomic(bvec.bv_page);
330                 crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
331                 kunmap_atomic(src);
332
333                 /* REQ_OP_WRITE_SAME has only one segment,
334                  * checksum the payload only once. */
335                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
336                         break;
337         }
338         crypto_shash_final(desc, digest);
339         shash_desc_zero(desc);
340 }
341
342 /* MAYBE merge common code with w_e_end_ov_req */
343 static int w_e_send_csum(struct drbd_work *w, int cancel)
344 {
345         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
346         struct drbd_peer_device *peer_device = peer_req->peer_device;
347         struct drbd_device *device = peer_device->device;
348         int digest_size;
349         void *digest;
350         int err = 0;
351
352         if (unlikely(cancel))
353                 goto out;
354
355         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
356                 goto out;
357
358         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
359         digest = kmalloc(digest_size, GFP_NOIO);
360         if (digest) {
361                 sector_t sector = peer_req->i.sector;
362                 unsigned int size = peer_req->i.size;
363                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
364                 /* Free peer_req and pages before send.
365                  * In case we block on congestion, we could otherwise run into
366                  * some distributed deadlock, if the other side blocks on
367                  * congestion as well, because our receiver blocks in
368                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
369                 drbd_free_peer_req(device, peer_req);
370                 peer_req = NULL;
371                 inc_rs_pending(device);
372                 err = drbd_send_drequest_csum(peer_device, sector, size,
373                                               digest, digest_size,
374                                               P_CSUM_RS_REQUEST);
375                 kfree(digest);
376         } else {
377                 drbd_err(device, "kmalloc() of digest failed.\n");
378                 err = -ENOMEM;
379         }
380
381 out:
382         if (peer_req)
383                 drbd_free_peer_req(device, peer_req);
384
385         if (unlikely(err))
386                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
387         return err;
388 }
389
390 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
391
392 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
393 {
394         struct drbd_device *device = peer_device->device;
395         struct drbd_peer_request *peer_req;
396
397         if (!get_ldev(device))
398                 return -EIO;
399
400         /* GFP_TRY, because if there is no memory available right now, this may
401          * be rescheduled for later. It is "only" background resync, after all. */
402         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
403                                        size, size, GFP_TRY);
404         if (!peer_req)
405                 goto defer;
406
407         peer_req->w.cb = w_e_send_csum;
408         spin_lock_irq(&device->resource->req_lock);
409         list_add_tail(&peer_req->w.list, &device->read_ee);
410         spin_unlock_irq(&device->resource->req_lock);
411
412         atomic_add(size >> 9, &device->rs_sect_ev);
413         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
414                                      DRBD_FAULT_RS_RD) == 0)
415                 return 0;
416
417         /* If it failed because of ENOMEM, retry should help.  If it failed
418          * because bio_add_page failed (probably broken lower level driver),
419          * retry may or may not help.
420          * If it does not, you may need to force disconnect. */
421         spin_lock_irq(&device->resource->req_lock);
422         list_del(&peer_req->w.list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         drbd_free_peer_req(device, peer_req);
426 defer:
427         put_ldev(device);
428         return -EAGAIN;
429 }
430
431 int w_resync_timer(struct drbd_work *w, int cancel)
432 {
433         struct drbd_device *device =
434                 container_of(w, struct drbd_device, resync_work);
435
436         switch (device->state.conn) {
437         case C_VERIFY_S:
438                 make_ov_request(device, cancel);
439                 break;
440         case C_SYNC_TARGET:
441                 make_resync_request(device, cancel);
442                 break;
443         }
444
445         return 0;
446 }
447
448 void resync_timer_fn(struct timer_list *t)
449 {
450         struct drbd_device *device = from_timer(device, t, resync_timer);
451
452         drbd_queue_work_if_unqueued(
453                 &first_peer_device(device)->connection->sender_work,
454                 &device->resync_work);
455 }
456
457 static void fifo_set(struct fifo_buffer *fb, int value)
458 {
459         int i;
460
461         for (i = 0; i < fb->size; i++)
462                 fb->values[i] = value;
463 }
464
465 static int fifo_push(struct fifo_buffer *fb, int value)
466 {
467         int ov;
468
469         ov = fb->values[fb->head_index];
470         fb->values[fb->head_index++] = value;
471
472         if (fb->head_index >= fb->size)
473                 fb->head_index = 0;
474
475         return ov;
476 }
477
478 static void fifo_add_val(struct fifo_buffer *fb, int value)
479 {
480         int i;
481
482         for (i = 0; i < fb->size; i++)
483                 fb->values[i] += value;
484 }
485
486 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
487 {
488         struct fifo_buffer *fb;
489
490         fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
491         if (!fb)
492                 return NULL;
493
494         fb->head_index = 0;
495         fb->size = fifo_size;
496         fb->total = 0;
497
498         return fb;
499 }
500
501 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
502 {
503         struct disk_conf *dc;
504         unsigned int want;     /* The number of sectors we want in-flight */
505         int req_sect; /* Number of sectors to request in this turn */
506         int correction; /* Number of sectors more we need in-flight */
507         int cps; /* correction per invocation of drbd_rs_controller() */
508         int steps; /* Number of time steps to plan ahead */
509         int curr_corr;
510         int max_sect;
511         struct fifo_buffer *plan;
512
513         dc = rcu_dereference(device->ldev->disk_conf);
514         plan = rcu_dereference(device->rs_plan_s);
515
516         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
517
518         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
519                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
520         } else { /* normal path */
521                 want = dc->c_fill_target ? dc->c_fill_target :
522                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
523         }
524
525         correction = want - device->rs_in_flight - plan->total;
526
527         /* Plan ahead */
528         cps = correction / steps;
529         fifo_add_val(plan, cps);
530         plan->total += cps * steps;
531
532         /* What we do in this step */
533         curr_corr = fifo_push(plan, 0);
534         plan->total -= curr_corr;
535
536         req_sect = sect_in + curr_corr;
537         if (req_sect < 0)
538                 req_sect = 0;
539
540         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
541         if (req_sect > max_sect)
542                 req_sect = max_sect;
543
544         /*
545         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
546                  sect_in, device->rs_in_flight, want, correction,
547                  steps, cps, device->rs_planed, curr_corr, req_sect);
548         */
549
550         return req_sect;
551 }
552
553 static int drbd_rs_number_requests(struct drbd_device *device)
554 {
555         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
556         int number, mxb;
557
558         sect_in = atomic_xchg(&device->rs_sect_in, 0);
559         device->rs_in_flight -= sect_in;
560
561         rcu_read_lock();
562         mxb = drbd_get_max_buffers(device) / 2;
563         if (rcu_dereference(device->rs_plan_s)->size) {
564                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
565                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
566         } else {
567                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
568                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
569         }
570         rcu_read_unlock();
571
572         /* Don't have more than "max-buffers"/2 in-flight.
573          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
574          * potentially causing a distributed deadlock on congestion during
575          * online-verify or (checksum-based) resync, if max-buffers,
576          * socket buffer sizes and resync rate settings are mis-configured. */
577
578         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
579          * mxb (as used here, and in drbd_alloc_pages on the peer) is
580          * "number of pages" (typically also 4k),
581          * but "rs_in_flight" is in "sectors" (512 Byte). */
582         if (mxb - device->rs_in_flight/8 < number)
583                 number = mxb - device->rs_in_flight/8;
584
585         return number;
586 }
587
588 static int make_resync_request(struct drbd_device *const device, int cancel)
589 {
590         struct drbd_peer_device *const peer_device = first_peer_device(device);
591         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
592         unsigned long bit;
593         sector_t sector;
594         const sector_t capacity = get_capacity(device->vdisk);
595         int max_bio_size;
596         int number, rollback_i, size;
597         int align, requeue = 0;
598         int i = 0;
599         int discard_granularity = 0;
600
601         if (unlikely(cancel))
602                 return 0;
603
604         if (device->rs_total == 0) {
605                 /* empty resync? */
606                 drbd_resync_finished(device);
607                 return 0;
608         }
609
610         if (!get_ldev(device)) {
611                 /* Since we only need to access device->rsync a
612                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
613                    to continue resync with a broken disk makes no sense at
614                    all */
615                 drbd_err(device, "Disk broke down during resync!\n");
616                 return 0;
617         }
618
619         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
620                 rcu_read_lock();
621                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
622                 rcu_read_unlock();
623         }
624
625         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
626         number = drbd_rs_number_requests(device);
627         if (number <= 0)
628                 goto requeue;
629
630         for (i = 0; i < number; i++) {
631                 /* Stop generating RS requests when half of the send buffer is filled,
632                  * but notify TCP that we'd like to have more space. */
633                 mutex_lock(&connection->data.mutex);
634                 if (connection->data.socket) {
635                         struct sock *sk = connection->data.socket->sk;
636                         int queued = sk->sk_wmem_queued;
637                         int sndbuf = sk->sk_sndbuf;
638                         if (queued > sndbuf / 2) {
639                                 requeue = 1;
640                                 if (sk->sk_socket)
641                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
642                         }
643                 } else
644                         requeue = 1;
645                 mutex_unlock(&connection->data.mutex);
646                 if (requeue)
647                         goto requeue;
648
649 next_sector:
650                 size = BM_BLOCK_SIZE;
651                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
652
653                 if (bit == DRBD_END_OF_BITMAP) {
654                         device->bm_resync_fo = drbd_bm_bits(device);
655                         put_ldev(device);
656                         return 0;
657                 }
658
659                 sector = BM_BIT_TO_SECT(bit);
660
661                 if (drbd_try_rs_begin_io(device, sector)) {
662                         device->bm_resync_fo = bit;
663                         goto requeue;
664                 }
665                 device->bm_resync_fo = bit + 1;
666
667                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
668                         drbd_rs_complete_io(device, sector);
669                         goto next_sector;
670                 }
671
672 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
673                 /* try to find some adjacent bits.
674                  * we stop if we have already the maximum req size.
675                  *
676                  * Additionally always align bigger requests, in order to
677                  * be prepared for all stripe sizes of software RAIDs.
678                  */
679                 align = 1;
680                 rollback_i = i;
681                 while (i < number) {
682                         if (size + BM_BLOCK_SIZE > max_bio_size)
683                                 break;
684
685                         /* Be always aligned */
686                         if (sector & ((1<<(align+3))-1))
687                                 break;
688
689                         if (discard_granularity && size == discard_granularity)
690                                 break;
691
692                         /* do not cross extent boundaries */
693                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
694                                 break;
695                         /* now, is it actually dirty, after all?
696                          * caution, drbd_bm_test_bit is tri-state for some
697                          * obscure reason; ( b == 0 ) would get the out-of-band
698                          * only accidentally right because of the "oddly sized"
699                          * adjustment below */
700                         if (drbd_bm_test_bit(device, bit+1) != 1)
701                                 break;
702                         bit++;
703                         size += BM_BLOCK_SIZE;
704                         if ((BM_BLOCK_SIZE << align) <= size)
705                                 align++;
706                         i++;
707                 }
708                 /* if we merged some,
709                  * reset the offset to start the next drbd_bm_find_next from */
710                 if (size > BM_BLOCK_SIZE)
711                         device->bm_resync_fo = bit + 1;
712 #endif
713
714                 /* adjust very last sectors, in case we are oddly sized */
715                 if (sector + (size>>9) > capacity)
716                         size = (capacity-sector)<<9;
717
718                 if (device->use_csums) {
719                         switch (read_for_csum(peer_device, sector, size)) {
720                         case -EIO: /* Disk failure */
721                                 put_ldev(device);
722                                 return -EIO;
723                         case -EAGAIN: /* allocation failed, or ldev busy */
724                                 drbd_rs_complete_io(device, sector);
725                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
726                                 i = rollback_i;
727                                 goto requeue;
728                         case 0:
729                                 /* everything ok */
730                                 break;
731                         default:
732                                 BUG();
733                         }
734                 } else {
735                         int err;
736
737                         inc_rs_pending(device);
738                         err = drbd_send_drequest(peer_device,
739                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
740                                                  sector, size, ID_SYNCER);
741                         if (err) {
742                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
743                                 dec_rs_pending(device);
744                                 put_ldev(device);
745                                 return err;
746                         }
747                 }
748         }
749
750         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
751                 /* last syncer _request_ was sent,
752                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
753                  * next sync group will resume), as soon as we receive the last
754                  * resync data block, and the last bit is cleared.
755                  * until then resync "work" is "inactive" ...
756                  */
757                 put_ldev(device);
758                 return 0;
759         }
760
761  requeue:
762         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
763         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
764         put_ldev(device);
765         return 0;
766 }
767
768 static int make_ov_request(struct drbd_device *device, int cancel)
769 {
770         int number, i, size;
771         sector_t sector;
772         const sector_t capacity = get_capacity(device->vdisk);
773         bool stop_sector_reached = false;
774
775         if (unlikely(cancel))
776                 return 1;
777
778         number = drbd_rs_number_requests(device);
779
780         sector = device->ov_position;
781         for (i = 0; i < number; i++) {
782                 if (sector >= capacity)
783                         return 1;
784
785                 /* We check for "finished" only in the reply path:
786                  * w_e_end_ov_reply().
787                  * We need to send at least one request out. */
788                 stop_sector_reached = i > 0
789                         && verify_can_do_stop_sector(device)
790                         && sector >= device->ov_stop_sector;
791                 if (stop_sector_reached)
792                         break;
793
794                 size = BM_BLOCK_SIZE;
795
796                 if (drbd_try_rs_begin_io(device, sector)) {
797                         device->ov_position = sector;
798                         goto requeue;
799                 }
800
801                 if (sector + (size>>9) > capacity)
802                         size = (capacity-sector)<<9;
803
804                 inc_rs_pending(device);
805                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
806                         dec_rs_pending(device);
807                         return 0;
808                 }
809                 sector += BM_SECT_PER_BIT;
810         }
811         device->ov_position = sector;
812
813  requeue:
814         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
815         if (i == 0 || !stop_sector_reached)
816                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
817         return 1;
818 }
819
820 int w_ov_finished(struct drbd_work *w, int cancel)
821 {
822         struct drbd_device_work *dw =
823                 container_of(w, struct drbd_device_work, w);
824         struct drbd_device *device = dw->device;
825         kfree(dw);
826         ov_out_of_sync_print(device);
827         drbd_resync_finished(device);
828
829         return 0;
830 }
831
832 static int w_resync_finished(struct drbd_work *w, int cancel)
833 {
834         struct drbd_device_work *dw =
835                 container_of(w, struct drbd_device_work, w);
836         struct drbd_device *device = dw->device;
837         kfree(dw);
838
839         drbd_resync_finished(device);
840
841         return 0;
842 }
843
844 static void ping_peer(struct drbd_device *device)
845 {
846         struct drbd_connection *connection = first_peer_device(device)->connection;
847
848         clear_bit(GOT_PING_ACK, &connection->flags);
849         request_ping(connection);
850         wait_event(connection->ping_wait,
851                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
852 }
853
854 int drbd_resync_finished(struct drbd_device *device)
855 {
856         struct drbd_connection *connection = first_peer_device(device)->connection;
857         unsigned long db, dt, dbdt;
858         unsigned long n_oos;
859         union drbd_state os, ns;
860         struct drbd_device_work *dw;
861         char *khelper_cmd = NULL;
862         int verify_done = 0;
863
864         /* Remove all elements from the resync LRU. Since future actions
865          * might set bits in the (main) bitmap, then the entries in the
866          * resync LRU would be wrong. */
867         if (drbd_rs_del_all(device)) {
868                 /* In case this is not possible now, most probably because
869                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
870                  * queue (or even the read operations for those packets
871                  * is not finished by now).   Retry in 100ms. */
872
873                 schedule_timeout_interruptible(HZ / 10);
874                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
875                 if (dw) {
876                         dw->w.cb = w_resync_finished;
877                         dw->device = device;
878                         drbd_queue_work(&connection->sender_work, &dw->w);
879                         return 1;
880                 }
881                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
882         }
883
884         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
885         if (dt <= 0)
886                 dt = 1;
887
888         db = device->rs_total;
889         /* adjust for verify start and stop sectors, respective reached position */
890         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
891                 db -= device->ov_left;
892
893         dbdt = Bit2KB(db/dt);
894         device->rs_paused /= HZ;
895
896         if (!get_ldev(device))
897                 goto out;
898
899         ping_peer(device);
900
901         spin_lock_irq(&device->resource->req_lock);
902         os = drbd_read_state(device);
903
904         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
905
906         /* This protects us against multiple calls (that can happen in the presence
907            of application IO), and against connectivity loss just before we arrive here. */
908         if (os.conn <= C_CONNECTED)
909                 goto out_unlock;
910
911         ns = os;
912         ns.conn = C_CONNECTED;
913
914         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
915              verify_done ? "Online verify" : "Resync",
916              dt + device->rs_paused, device->rs_paused, dbdt);
917
918         n_oos = drbd_bm_total_weight(device);
919
920         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
921                 if (n_oos) {
922                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
923                               n_oos, Bit2KB(1));
924                         khelper_cmd = "out-of-sync";
925                 }
926         } else {
927                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
928
929                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
930                         khelper_cmd = "after-resync-target";
931
932                 if (device->use_csums && device->rs_total) {
933                         const unsigned long s = device->rs_same_csum;
934                         const unsigned long t = device->rs_total;
935                         const int ratio =
936                                 (t == 0)     ? 0 :
937                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
938                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
939                              "transferred %luK total %luK\n",
940                              ratio,
941                              Bit2KB(device->rs_same_csum),
942                              Bit2KB(device->rs_total - device->rs_same_csum),
943                              Bit2KB(device->rs_total));
944                 }
945         }
946
947         if (device->rs_failed) {
948                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
949
950                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
951                         ns.disk = D_INCONSISTENT;
952                         ns.pdsk = D_UP_TO_DATE;
953                 } else {
954                         ns.disk = D_UP_TO_DATE;
955                         ns.pdsk = D_INCONSISTENT;
956                 }
957         } else {
958                 ns.disk = D_UP_TO_DATE;
959                 ns.pdsk = D_UP_TO_DATE;
960
961                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
962                         if (device->p_uuid) {
963                                 int i;
964                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
965                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
966                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
967                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
968                         } else {
969                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
970                         }
971                 }
972
973                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
974                         /* for verify runs, we don't update uuids here,
975                          * so there would be nothing to report. */
976                         drbd_uuid_set_bm(device, 0UL);
977                         drbd_print_uuids(device, "updated UUIDs");
978                         if (device->p_uuid) {
979                                 /* Now the two UUID sets are equal, update what we
980                                  * know of the peer. */
981                                 int i;
982                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
983                                         device->p_uuid[i] = device->ldev->md.uuid[i];
984                         }
985                 }
986         }
987
988         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
989 out_unlock:
990         spin_unlock_irq(&device->resource->req_lock);
991
992         /* If we have been sync source, and have an effective fencing-policy,
993          * once *all* volumes are back in sync, call "unfence". */
994         if (os.conn == C_SYNC_SOURCE) {
995                 enum drbd_disk_state disk_state = D_MASK;
996                 enum drbd_disk_state pdsk_state = D_MASK;
997                 enum drbd_fencing_p fp = FP_DONT_CARE;
998
999                 rcu_read_lock();
1000                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001                 if (fp != FP_DONT_CARE) {
1002                         struct drbd_peer_device *peer_device;
1003                         int vnr;
1004                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005                                 struct drbd_device *device = peer_device->device;
1006                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008                         }
1009                 }
1010                 rcu_read_unlock();
1011                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012                         conn_khelper(connection, "unfence-peer");
1013         }
1014
1015         put_ldev(device);
1016 out:
1017         device->rs_total  = 0;
1018         device->rs_failed = 0;
1019         device->rs_paused = 0;
1020
1021         /* reset start sector, if we reached end of device */
1022         if (verify_done && device->ov_left == 0)
1023                 device->ov_start_sector = 0;
1024
1025         drbd_md_sync(device);
1026
1027         if (khelper_cmd)
1028                 drbd_khelper(device, khelper_cmd);
1029
1030         return 1;
1031 }
1032
1033 /* helper */
1034 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1035 {
1036         if (drbd_peer_req_has_active_page(peer_req)) {
1037                 /* This might happen if sendpage() has not finished */
1038                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1039                 atomic_add(i, &device->pp_in_use_by_net);
1040                 atomic_sub(i, &device->pp_in_use);
1041                 spin_lock_irq(&device->resource->req_lock);
1042                 list_add_tail(&peer_req->w.list, &device->net_ee);
1043                 spin_unlock_irq(&device->resource->req_lock);
1044                 wake_up(&drbd_pp_wait);
1045         } else
1046                 drbd_free_peer_req(device, peer_req);
1047 }
1048
1049 /**
1050  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1051  * @w:          work object.
1052  * @cancel:     The connection will be closed anyways
1053  */
1054 int w_e_end_data_req(struct drbd_work *w, int cancel)
1055 {
1056         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1057         struct drbd_peer_device *peer_device = peer_req->peer_device;
1058         struct drbd_device *device = peer_device->device;
1059         int err;
1060
1061         if (unlikely(cancel)) {
1062                 drbd_free_peer_req(device, peer_req);
1063                 dec_unacked(device);
1064                 return 0;
1065         }
1066
1067         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1068                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1069         } else {
1070                 if (__ratelimit(&drbd_ratelimit_state))
1071                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1072                             (unsigned long long)peer_req->i.sector);
1073
1074                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1075         }
1076
1077         dec_unacked(device);
1078
1079         move_to_net_ee_or_free(device, peer_req);
1080
1081         if (unlikely(err))
1082                 drbd_err(device, "drbd_send_block() failed\n");
1083         return err;
1084 }
1085
1086 static bool all_zero(struct drbd_peer_request *peer_req)
1087 {
1088         struct page *page = peer_req->pages;
1089         unsigned int len = peer_req->i.size;
1090
1091         page_chain_for_each(page) {
1092                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1093                 unsigned int i, words = l / sizeof(long);
1094                 unsigned long *d;
1095
1096                 d = kmap_atomic(page);
1097                 for (i = 0; i < words; i++) {
1098                         if (d[i]) {
1099                                 kunmap_atomic(d);
1100                                 return false;
1101                         }
1102                 }
1103                 kunmap_atomic(d);
1104                 len -= l;
1105         }
1106
1107         return true;
1108 }
1109
1110 /**
1111  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1112  * @w:          work object.
1113  * @cancel:     The connection will be closed anyways
1114  */
1115 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1116 {
1117         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118         struct drbd_peer_device *peer_device = peer_req->peer_device;
1119         struct drbd_device *device = peer_device->device;
1120         int err;
1121
1122         if (unlikely(cancel)) {
1123                 drbd_free_peer_req(device, peer_req);
1124                 dec_unacked(device);
1125                 return 0;
1126         }
1127
1128         if (get_ldev_if_state(device, D_FAILED)) {
1129                 drbd_rs_complete_io(device, peer_req->i.sector);
1130                 put_ldev(device);
1131         }
1132
1133         if (device->state.conn == C_AHEAD) {
1134                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1135         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1136                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1137                         inc_rs_pending(device);
1138                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1139                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1140                         else
1141                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142                 } else {
1143                         if (__ratelimit(&drbd_ratelimit_state))
1144                                 drbd_err(device, "Not sending RSDataReply, "
1145                                     "partner DISKLESS!\n");
1146                         err = 0;
1147                 }
1148         } else {
1149                 if (__ratelimit(&drbd_ratelimit_state))
1150                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1151                             (unsigned long long)peer_req->i.sector);
1152
1153                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154
1155                 /* update resync data with failure */
1156                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1157         }
1158
1159         dec_unacked(device);
1160
1161         move_to_net_ee_or_free(device, peer_req);
1162
1163         if (unlikely(err))
1164                 drbd_err(device, "drbd_send_block() failed\n");
1165         return err;
1166 }
1167
1168 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1169 {
1170         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1171         struct drbd_peer_device *peer_device = peer_req->peer_device;
1172         struct drbd_device *device = peer_device->device;
1173         struct digest_info *di;
1174         int digest_size;
1175         void *digest = NULL;
1176         int err, eq = 0;
1177
1178         if (unlikely(cancel)) {
1179                 drbd_free_peer_req(device, peer_req);
1180                 dec_unacked(device);
1181                 return 0;
1182         }
1183
1184         if (get_ldev(device)) {
1185                 drbd_rs_complete_io(device, peer_req->i.sector);
1186                 put_ldev(device);
1187         }
1188
1189         di = peer_req->digest;
1190
1191         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1192                 /* quick hack to try to avoid a race against reconfiguration.
1193                  * a real fix would be much more involved,
1194                  * introducing more locking mechanisms */
1195                 if (peer_device->connection->csums_tfm) {
1196                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1197                         D_ASSERT(device, digest_size == di->digest_size);
1198                         digest = kmalloc(digest_size, GFP_NOIO);
1199                 }
1200                 if (digest) {
1201                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1202                         eq = !memcmp(digest, di->digest, digest_size);
1203                         kfree(digest);
1204                 }
1205
1206                 if (eq) {
1207                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1208                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1209                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1210                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1211                 } else {
1212                         inc_rs_pending(device);
1213                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1214                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1215                         kfree(di);
1216                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1217                 }
1218         } else {
1219                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1220                 if (__ratelimit(&drbd_ratelimit_state))
1221                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1222         }
1223
1224         dec_unacked(device);
1225         move_to_net_ee_or_free(device, peer_req);
1226
1227         if (unlikely(err))
1228                 drbd_err(device, "drbd_send_block/ack() failed\n");
1229         return err;
1230 }
1231
1232 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1233 {
1234         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1235         struct drbd_peer_device *peer_device = peer_req->peer_device;
1236         struct drbd_device *device = peer_device->device;
1237         sector_t sector = peer_req->i.sector;
1238         unsigned int size = peer_req->i.size;
1239         int digest_size;
1240         void *digest;
1241         int err = 0;
1242
1243         if (unlikely(cancel))
1244                 goto out;
1245
1246         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1247         digest = kmalloc(digest_size, GFP_NOIO);
1248         if (!digest) {
1249                 err = 1;        /* terminate the connection in case the allocation failed */
1250                 goto out;
1251         }
1252
1253         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1254                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1255         else
1256                 memset(digest, 0, digest_size);
1257
1258         /* Free e and pages before send.
1259          * In case we block on congestion, we could otherwise run into
1260          * some distributed deadlock, if the other side blocks on
1261          * congestion as well, because our receiver blocks in
1262          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1263         drbd_free_peer_req(device, peer_req);
1264         peer_req = NULL;
1265         inc_rs_pending(device);
1266         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1267         if (err)
1268                 dec_rs_pending(device);
1269         kfree(digest);
1270
1271 out:
1272         if (peer_req)
1273                 drbd_free_peer_req(device, peer_req);
1274         dec_unacked(device);
1275         return err;
1276 }
1277
1278 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1279 {
1280         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281                 device->ov_last_oos_size += size>>9;
1282         } else {
1283                 device->ov_last_oos_start = sector;
1284                 device->ov_last_oos_size = size>>9;
1285         }
1286         drbd_set_out_of_sync(device, sector, size);
1287 }
1288
1289 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290 {
1291         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292         struct drbd_peer_device *peer_device = peer_req->peer_device;
1293         struct drbd_device *device = peer_device->device;
1294         struct digest_info *di;
1295         void *digest;
1296         sector_t sector = peer_req->i.sector;
1297         unsigned int size = peer_req->i.size;
1298         int digest_size;
1299         int err, eq = 0;
1300         bool stop_sector_reached = false;
1301
1302         if (unlikely(cancel)) {
1303                 drbd_free_peer_req(device, peer_req);
1304                 dec_unacked(device);
1305                 return 0;
1306         }
1307
1308         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309          * the resync lru has been cleaned up already */
1310         if (get_ldev(device)) {
1311                 drbd_rs_complete_io(device, peer_req->i.sector);
1312                 put_ldev(device);
1313         }
1314
1315         di = peer_req->digest;
1316
1317         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319                 digest = kmalloc(digest_size, GFP_NOIO);
1320                 if (digest) {
1321                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323                         D_ASSERT(device, digest_size == di->digest_size);
1324                         eq = !memcmp(digest, di->digest, digest_size);
1325                         kfree(digest);
1326                 }
1327         }
1328
1329         /* Free peer_req and pages before send.
1330          * In case we block on congestion, we could otherwise run into
1331          * some distributed deadlock, if the other side blocks on
1332          * congestion as well, because our receiver blocks in
1333          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334         drbd_free_peer_req(device, peer_req);
1335         if (!eq)
1336                 drbd_ov_out_of_sync_found(device, sector, size);
1337         else
1338                 ov_out_of_sync_print(device);
1339
1340         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343         dec_unacked(device);
1344
1345         --device->ov_left;
1346
1347         /* let's advance progress step marks only for every other megabyte */
1348         if ((device->ov_left & 0x200) == 0x200)
1349                 drbd_advance_rs_marks(device, device->ov_left);
1350
1351         stop_sector_reached = verify_can_do_stop_sector(device) &&
1352                 (sector + (size>>9)) >= device->ov_stop_sector;
1353
1354         if (device->ov_left == 0 || stop_sector_reached) {
1355                 ov_out_of_sync_print(device);
1356                 drbd_resync_finished(device);
1357         }
1358
1359         return err;
1360 }
1361
1362 /* FIXME
1363  * We need to track the number of pending barrier acks,
1364  * and to be able to wait for them.
1365  * See also comment in drbd_adm_attach before drbd_suspend_io.
1366  */
1367 static int drbd_send_barrier(struct drbd_connection *connection)
1368 {
1369         struct p_barrier *p;
1370         struct drbd_socket *sock;
1371
1372         sock = &connection->data;
1373         p = conn_prepare_command(connection, sock);
1374         if (!p)
1375                 return -EIO;
1376         p->barrier = connection->send.current_epoch_nr;
1377         p->pad = 0;
1378         connection->send.current_epoch_writes = 0;
1379         connection->send.last_sent_barrier_jif = jiffies;
1380
1381         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382 }
1383
1384 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385 {
1386         struct drbd_socket *sock = &pd->connection->data;
1387         if (!drbd_prepare_command(pd, sock))
1388                 return -EIO;
1389         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390 }
1391
1392 int w_send_write_hint(struct drbd_work *w, int cancel)
1393 {
1394         struct drbd_device *device =
1395                 container_of(w, struct drbd_device, unplug_work);
1396
1397         if (cancel)
1398                 return 0;
1399         return pd_send_unplug_remote(first_peer_device(device));
1400 }
1401
1402 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403 {
1404         if (!connection->send.seen_any_write_yet) {
1405                 connection->send.seen_any_write_yet = true;
1406                 connection->send.current_epoch_nr = epoch;
1407                 connection->send.current_epoch_writes = 0;
1408                 connection->send.last_sent_barrier_jif = jiffies;
1409         }
1410 }
1411
1412 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413 {
1414         /* re-init if first write on this connection */
1415         if (!connection->send.seen_any_write_yet)
1416                 return;
1417         if (connection->send.current_epoch_nr != epoch) {
1418                 if (connection->send.current_epoch_writes)
1419                         drbd_send_barrier(connection);
1420                 connection->send.current_epoch_nr = epoch;
1421         }
1422 }
1423
1424 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425 {
1426         struct drbd_request *req = container_of(w, struct drbd_request, w);
1427         struct drbd_device *device = req->device;
1428         struct drbd_peer_device *const peer_device = first_peer_device(device);
1429         struct drbd_connection *const connection = peer_device->connection;
1430         int err;
1431
1432         if (unlikely(cancel)) {
1433                 req_mod(req, SEND_CANCELED);
1434                 return 0;
1435         }
1436         req->pre_send_jif = jiffies;
1437
1438         /* this time, no connection->send.current_epoch_writes++;
1439          * If it was sent, it was the closing barrier for the last
1440          * replicated epoch, before we went into AHEAD mode.
1441          * No more barriers will be sent, until we leave AHEAD mode again. */
1442         maybe_send_barrier(connection, req->epoch);
1443
1444         err = drbd_send_out_of_sync(peer_device, req);
1445         req_mod(req, OOS_HANDED_TO_NETWORK);
1446
1447         return err;
1448 }
1449
1450 /**
1451  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452  * @w:          work object.
1453  * @cancel:     The connection will be closed anyways
1454  */
1455 int w_send_dblock(struct drbd_work *w, int cancel)
1456 {
1457         struct drbd_request *req = container_of(w, struct drbd_request, w);
1458         struct drbd_device *device = req->device;
1459         struct drbd_peer_device *const peer_device = first_peer_device(device);
1460         struct drbd_connection *connection = peer_device->connection;
1461         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462         int err;
1463
1464         if (unlikely(cancel)) {
1465                 req_mod(req, SEND_CANCELED);
1466                 return 0;
1467         }
1468         req->pre_send_jif = jiffies;
1469
1470         re_init_if_first_write(connection, req->epoch);
1471         maybe_send_barrier(connection, req->epoch);
1472         connection->send.current_epoch_writes++;
1473
1474         err = drbd_send_dblock(peer_device, req);
1475         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1476
1477         if (do_send_unplug && !err)
1478                 pd_send_unplug_remote(peer_device);
1479
1480         return err;
1481 }
1482
1483 /**
1484  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485  * @w:          work object.
1486  * @cancel:     The connection will be closed anyways
1487  */
1488 int w_send_read_req(struct drbd_work *w, int cancel)
1489 {
1490         struct drbd_request *req = container_of(w, struct drbd_request, w);
1491         struct drbd_device *device = req->device;
1492         struct drbd_peer_device *const peer_device = first_peer_device(device);
1493         struct drbd_connection *connection = peer_device->connection;
1494         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495         int err;
1496
1497         if (unlikely(cancel)) {
1498                 req_mod(req, SEND_CANCELED);
1499                 return 0;
1500         }
1501         req->pre_send_jif = jiffies;
1502
1503         /* Even read requests may close a write epoch,
1504          * if there was any yet. */
1505         maybe_send_barrier(connection, req->epoch);
1506
1507         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508                                  (unsigned long)req);
1509
1510         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1511
1512         if (do_send_unplug && !err)
1513                 pd_send_unplug_remote(peer_device);
1514
1515         return err;
1516 }
1517
1518 int w_restart_disk_io(struct drbd_work *w, int cancel)
1519 {
1520         struct drbd_request *req = container_of(w, struct drbd_request, w);
1521         struct drbd_device *device = req->device;
1522
1523         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524                 drbd_al_begin_io(device, &req->i);
1525
1526         drbd_req_make_private_bio(req, req->master_bio);
1527         bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1528         submit_bio_noacct(req->private_bio);
1529
1530         return 0;
1531 }
1532
1533 static int _drbd_may_sync_now(struct drbd_device *device)
1534 {
1535         struct drbd_device *odev = device;
1536         int resync_after;
1537
1538         while (1) {
1539                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1540                         return 1;
1541                 rcu_read_lock();
1542                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1543                 rcu_read_unlock();
1544                 if (resync_after == -1)
1545                         return 1;
1546                 odev = minor_to_device(resync_after);
1547                 if (!odev)
1548                         return 1;
1549                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1550                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1551                     odev->state.aftr_isp || odev->state.peer_isp ||
1552                     odev->state.user_isp)
1553                         return 0;
1554         }
1555 }
1556
1557 /**
1558  * drbd_pause_after() - Pause resync on all devices that may not resync now
1559  * @device:     DRBD device.
1560  *
1561  * Called from process context only (admin command and after_state_ch).
1562  */
1563 static bool drbd_pause_after(struct drbd_device *device)
1564 {
1565         bool changed = false;
1566         struct drbd_device *odev;
1567         int i;
1568
1569         rcu_read_lock();
1570         idr_for_each_entry(&drbd_devices, odev, i) {
1571                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1572                         continue;
1573                 if (!_drbd_may_sync_now(odev) &&
1574                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1575                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1576                         changed = true;
1577         }
1578         rcu_read_unlock();
1579
1580         return changed;
1581 }
1582
1583 /**
1584  * drbd_resume_next() - Resume resync on all devices that may resync now
1585  * @device:     DRBD device.
1586  *
1587  * Called from process context only (admin command and worker).
1588  */
1589 static bool drbd_resume_next(struct drbd_device *device)
1590 {
1591         bool changed = false;
1592         struct drbd_device *odev;
1593         int i;
1594
1595         rcu_read_lock();
1596         idr_for_each_entry(&drbd_devices, odev, i) {
1597                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1598                         continue;
1599                 if (odev->state.aftr_isp) {
1600                         if (_drbd_may_sync_now(odev) &&
1601                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1602                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1603                                 changed = true;
1604                 }
1605         }
1606         rcu_read_unlock();
1607         return changed;
1608 }
1609
1610 void resume_next_sg(struct drbd_device *device)
1611 {
1612         lock_all_resources();
1613         drbd_resume_next(device);
1614         unlock_all_resources();
1615 }
1616
1617 void suspend_other_sg(struct drbd_device *device)
1618 {
1619         lock_all_resources();
1620         drbd_pause_after(device);
1621         unlock_all_resources();
1622 }
1623
1624 /* caller must lock_all_resources() */
1625 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1626 {
1627         struct drbd_device *odev;
1628         int resync_after;
1629
1630         if (o_minor == -1)
1631                 return NO_ERROR;
1632         if (o_minor < -1 || o_minor > MINORMASK)
1633                 return ERR_RESYNC_AFTER;
1634
1635         /* check for loops */
1636         odev = minor_to_device(o_minor);
1637         while (1) {
1638                 if (odev == device)
1639                         return ERR_RESYNC_AFTER_CYCLE;
1640
1641                 /* You are free to depend on diskless, non-existing,
1642                  * or not yet/no longer existing minors.
1643                  * We only reject dependency loops.
1644                  * We cannot follow the dependency chain beyond a detached or
1645                  * missing minor.
1646                  */
1647                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1648                         return NO_ERROR;
1649
1650                 rcu_read_lock();
1651                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1652                 rcu_read_unlock();
1653                 /* dependency chain ends here, no cycles. */
1654                 if (resync_after == -1)
1655                         return NO_ERROR;
1656
1657                 /* follow the dependency chain */
1658                 odev = minor_to_device(resync_after);
1659         }
1660 }
1661
1662 /* caller must lock_all_resources() */
1663 void drbd_resync_after_changed(struct drbd_device *device)
1664 {
1665         int changed;
1666
1667         do {
1668                 changed  = drbd_pause_after(device);
1669                 changed |= drbd_resume_next(device);
1670         } while (changed);
1671 }
1672
1673 void drbd_rs_controller_reset(struct drbd_device *device)
1674 {
1675         struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1676         struct fifo_buffer *plan;
1677
1678         atomic_set(&device->rs_sect_in, 0);
1679         atomic_set(&device->rs_sect_ev, 0);
1680         device->rs_in_flight = 0;
1681         device->rs_last_events =
1682                 (int)part_stat_read_accum(disk->part0, sectors);
1683
1684         /* Updating the RCU protected object in place is necessary since
1685            this function gets called from atomic context.
1686            It is valid since all other updates also lead to an completely
1687            empty fifo */
1688         rcu_read_lock();
1689         plan = rcu_dereference(device->rs_plan_s);
1690         plan->total = 0;
1691         fifo_set(plan, 0);
1692         rcu_read_unlock();
1693 }
1694
1695 void start_resync_timer_fn(struct timer_list *t)
1696 {
1697         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1698         drbd_device_post_work(device, RS_START);
1699 }
1700
1701 static void do_start_resync(struct drbd_device *device)
1702 {
1703         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1704                 drbd_warn(device, "postponing start_resync ...\n");
1705                 device->start_resync_timer.expires = jiffies + HZ/10;
1706                 add_timer(&device->start_resync_timer);
1707                 return;
1708         }
1709
1710         drbd_start_resync(device, C_SYNC_SOURCE);
1711         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1712 }
1713
1714 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1715 {
1716         bool csums_after_crash_only;
1717         rcu_read_lock();
1718         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1719         rcu_read_unlock();
1720         return connection->agreed_pro_version >= 89 &&          /* supported? */
1721                 connection->csums_tfm &&                        /* configured? */
1722                 (csums_after_crash_only == false                /* use for each resync? */
1723                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1724 }
1725
1726 /**
1727  * drbd_start_resync() - Start the resync process
1728  * @device:     DRBD device.
1729  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1730  *
1731  * This function might bring you directly into one of the
1732  * C_PAUSED_SYNC_* states.
1733  */
1734 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1735 {
1736         struct drbd_peer_device *peer_device = first_peer_device(device);
1737         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1738         union drbd_state ns;
1739         int r;
1740
1741         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1742                 drbd_err(device, "Resync already running!\n");
1743                 return;
1744         }
1745
1746         if (!connection) {
1747                 drbd_err(device, "No connection to peer, aborting!\n");
1748                 return;
1749         }
1750
1751         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1752                 if (side == C_SYNC_TARGET) {
1753                         /* Since application IO was locked out during C_WF_BITMAP_T and
1754                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1755                            we check that we might make the data inconsistent. */
1756                         r = drbd_khelper(device, "before-resync-target");
1757                         r = (r >> 8) & 0xff;
1758                         if (r > 0) {
1759                                 drbd_info(device, "before-resync-target handler returned %d, "
1760                                          "dropping connection.\n", r);
1761                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1762                                 return;
1763                         }
1764                 } else /* C_SYNC_SOURCE */ {
1765                         r = drbd_khelper(device, "before-resync-source");
1766                         r = (r >> 8) & 0xff;
1767                         if (r > 0) {
1768                                 if (r == 3) {
1769                                         drbd_info(device, "before-resync-source handler returned %d, "
1770                                                  "ignoring. Old userland tools?", r);
1771                                 } else {
1772                                         drbd_info(device, "before-resync-source handler returned %d, "
1773                                                  "dropping connection.\n", r);
1774                                         conn_request_state(connection,
1775                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1776                                         return;
1777                                 }
1778                         }
1779                 }
1780         }
1781
1782         if (current == connection->worker.task) {
1783                 /* The worker should not sleep waiting for state_mutex,
1784                    that can take long */
1785                 if (!mutex_trylock(device->state_mutex)) {
1786                         set_bit(B_RS_H_DONE, &device->flags);
1787                         device->start_resync_timer.expires = jiffies + HZ/5;
1788                         add_timer(&device->start_resync_timer);
1789                         return;
1790                 }
1791         } else {
1792                 mutex_lock(device->state_mutex);
1793         }
1794
1795         lock_all_resources();
1796         clear_bit(B_RS_H_DONE, &device->flags);
1797         /* Did some connection breakage or IO error race with us? */
1798         if (device->state.conn < C_CONNECTED
1799         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1800                 unlock_all_resources();
1801                 goto out;
1802         }
1803
1804         ns = drbd_read_state(device);
1805
1806         ns.aftr_isp = !_drbd_may_sync_now(device);
1807
1808         ns.conn = side;
1809
1810         if (side == C_SYNC_TARGET)
1811                 ns.disk = D_INCONSISTENT;
1812         else /* side == C_SYNC_SOURCE */
1813                 ns.pdsk = D_INCONSISTENT;
1814
1815         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1816         ns = drbd_read_state(device);
1817
1818         if (ns.conn < C_CONNECTED)
1819                 r = SS_UNKNOWN_ERROR;
1820
1821         if (r == SS_SUCCESS) {
1822                 unsigned long tw = drbd_bm_total_weight(device);
1823                 unsigned long now = jiffies;
1824                 int i;
1825
1826                 device->rs_failed    = 0;
1827                 device->rs_paused    = 0;
1828                 device->rs_same_csum = 0;
1829                 device->rs_last_sect_ev = 0;
1830                 device->rs_total     = tw;
1831                 device->rs_start     = now;
1832                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1833                         device->rs_mark_left[i] = tw;
1834                         device->rs_mark_time[i] = now;
1835                 }
1836                 drbd_pause_after(device);
1837                 /* Forget potentially stale cached per resync extent bit-counts.
1838                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1839                  * disabled, and know the disk state is ok. */
1840                 spin_lock(&device->al_lock);
1841                 lc_reset(device->resync);
1842                 device->resync_locked = 0;
1843                 device->resync_wenr = LC_FREE;
1844                 spin_unlock(&device->al_lock);
1845         }
1846         unlock_all_resources();
1847
1848         if (r == SS_SUCCESS) {
1849                 wake_up(&device->al_wait); /* for lc_reset() above */
1850                 /* reset rs_last_bcast when a resync or verify is started,
1851                  * to deal with potential jiffies wrap. */
1852                 device->rs_last_bcast = jiffies - HZ;
1853
1854                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1855                      drbd_conn_str(ns.conn),
1856                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1857                      (unsigned long) device->rs_total);
1858                 if (side == C_SYNC_TARGET) {
1859                         device->bm_resync_fo = 0;
1860                         device->use_csums = use_checksum_based_resync(connection, device);
1861                 } else {
1862                         device->use_csums = false;
1863                 }
1864
1865                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1866                  * with w_send_oos, or the sync target will get confused as to
1867                  * how much bits to resync.  We cannot do that always, because for an
1868                  * empty resync and protocol < 95, we need to do it here, as we call
1869                  * drbd_resync_finished from here in that case.
1870                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1871                  * and from after_state_ch otherwise. */
1872                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1873                         drbd_gen_and_send_sync_uuid(peer_device);
1874
1875                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1876                         /* This still has a race (about when exactly the peers
1877                          * detect connection loss) that can lead to a full sync
1878                          * on next handshake. In 8.3.9 we fixed this with explicit
1879                          * resync-finished notifications, but the fix
1880                          * introduces a protocol change.  Sleeping for some
1881                          * time longer than the ping interval + timeout on the
1882                          * SyncSource, to give the SyncTarget the chance to
1883                          * detect connection loss, then waiting for a ping
1884                          * response (implicit in drbd_resync_finished) reduces
1885                          * the race considerably, but does not solve it. */
1886                         if (side == C_SYNC_SOURCE) {
1887                                 struct net_conf *nc;
1888                                 int timeo;
1889
1890                                 rcu_read_lock();
1891                                 nc = rcu_dereference(connection->net_conf);
1892                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1893                                 rcu_read_unlock();
1894                                 schedule_timeout_interruptible(timeo);
1895                         }
1896                         drbd_resync_finished(device);
1897                 }
1898
1899                 drbd_rs_controller_reset(device);
1900                 /* ns.conn may already be != device->state.conn,
1901                  * we may have been paused in between, or become paused until
1902                  * the timer triggers.
1903                  * No matter, that is handled in resync_timer_fn() */
1904                 if (ns.conn == C_SYNC_TARGET)
1905                         mod_timer(&device->resync_timer, jiffies);
1906
1907                 drbd_md_sync(device);
1908         }
1909         put_ldev(device);
1910 out:
1911         mutex_unlock(device->state_mutex);
1912 }
1913
1914 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1915 {
1916         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1917         device->rs_last_bcast = jiffies;
1918
1919         if (!get_ldev(device))
1920                 return;
1921
1922         drbd_bm_write_lazy(device, 0);
1923         if (resync_done && is_sync_state(device->state.conn))
1924                 drbd_resync_finished(device);
1925
1926         drbd_bcast_event(device, &sib);
1927         /* update timestamp, in case it took a while to write out stuff */
1928         device->rs_last_bcast = jiffies;
1929         put_ldev(device);
1930 }
1931
1932 static void drbd_ldev_destroy(struct drbd_device *device)
1933 {
1934         lc_destroy(device->resync);
1935         device->resync = NULL;
1936         lc_destroy(device->act_log);
1937         device->act_log = NULL;
1938
1939         __acquire(local);
1940         drbd_backing_dev_free(device, device->ldev);
1941         device->ldev = NULL;
1942         __release(local);
1943
1944         clear_bit(GOING_DISKLESS, &device->flags);
1945         wake_up(&device->misc_wait);
1946 }
1947
1948 static void go_diskless(struct drbd_device *device)
1949 {
1950         D_ASSERT(device, device->state.disk == D_FAILED);
1951         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1952          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1953          * the protected members anymore, though, so once put_ldev reaches zero
1954          * again, it will be safe to free them. */
1955
1956         /* Try to write changed bitmap pages, read errors may have just
1957          * set some bits outside the area covered by the activity log.
1958          *
1959          * If we have an IO error during the bitmap writeout,
1960          * we will want a full sync next time, just in case.
1961          * (Do we want a specific meta data flag for this?)
1962          *
1963          * If that does not make it to stable storage either,
1964          * we cannot do anything about that anymore.
1965          *
1966          * We still need to check if both bitmap and ldev are present, we may
1967          * end up here after a failed attach, before ldev was even assigned.
1968          */
1969         if (device->bitmap && device->ldev) {
1970                 /* An interrupted resync or similar is allowed to recounts bits
1971                  * while we detach.
1972                  * Any modifications would not be expected anymore, though.
1973                  */
1974                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1975                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1976                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1977                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1978                                 drbd_md_sync(device);
1979                         }
1980                 }
1981         }
1982
1983         drbd_force_state(device, NS(disk, D_DISKLESS));
1984 }
1985
1986 static int do_md_sync(struct drbd_device *device)
1987 {
1988         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1989         drbd_md_sync(device);
1990         return 0;
1991 }
1992
1993 /* only called from drbd_worker thread, no locking */
1994 void __update_timing_details(
1995                 struct drbd_thread_timing_details *tdp,
1996                 unsigned int *cb_nr,
1997                 void *cb,
1998                 const char *fn, const unsigned int line)
1999 {
2000         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2001         struct drbd_thread_timing_details *td = tdp + i;
2002
2003         td->start_jif = jiffies;
2004         td->cb_addr = cb;
2005         td->caller_fn = fn;
2006         td->line = line;
2007         td->cb_nr = *cb_nr;
2008
2009         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2010         td = tdp + i;
2011         memset(td, 0, sizeof(*td));
2012
2013         ++(*cb_nr);
2014 }
2015
2016 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2017 {
2018         if (test_bit(MD_SYNC, &todo))
2019                 do_md_sync(device);
2020         if (test_bit(RS_DONE, &todo) ||
2021             test_bit(RS_PROGRESS, &todo))
2022                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2023         if (test_bit(GO_DISKLESS, &todo))
2024                 go_diskless(device);
2025         if (test_bit(DESTROY_DISK, &todo))
2026                 drbd_ldev_destroy(device);
2027         if (test_bit(RS_START, &todo))
2028                 do_start_resync(device);
2029 }
2030
2031 #define DRBD_DEVICE_WORK_MASK   \
2032         ((1UL << GO_DISKLESS)   \
2033         |(1UL << DESTROY_DISK)  \
2034         |(1UL << MD_SYNC)       \
2035         |(1UL << RS_START)      \
2036         |(1UL << RS_PROGRESS)   \
2037         |(1UL << RS_DONE)       \
2038         )
2039
2040 static unsigned long get_work_bits(unsigned long *flags)
2041 {
2042         unsigned long old, new;
2043         do {
2044                 old = *flags;
2045                 new = old & ~DRBD_DEVICE_WORK_MASK;
2046         } while (cmpxchg(flags, old, new) != old);
2047         return old & DRBD_DEVICE_WORK_MASK;
2048 }
2049
2050 static void do_unqueued_work(struct drbd_connection *connection)
2051 {
2052         struct drbd_peer_device *peer_device;
2053         int vnr;
2054
2055         rcu_read_lock();
2056         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2057                 struct drbd_device *device = peer_device->device;
2058                 unsigned long todo = get_work_bits(&device->flags);
2059                 if (!todo)
2060                         continue;
2061
2062                 kref_get(&device->kref);
2063                 rcu_read_unlock();
2064                 do_device_work(device, todo);
2065                 kref_put(&device->kref, drbd_destroy_device);
2066                 rcu_read_lock();
2067         }
2068         rcu_read_unlock();
2069 }
2070
2071 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2072 {
2073         spin_lock_irq(&queue->q_lock);
2074         list_splice_tail_init(&queue->q, work_list);
2075         spin_unlock_irq(&queue->q_lock);
2076         return !list_empty(work_list);
2077 }
2078
2079 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2080 {
2081         DEFINE_WAIT(wait);
2082         struct net_conf *nc;
2083         int uncork, cork;
2084
2085         dequeue_work_batch(&connection->sender_work, work_list);
2086         if (!list_empty(work_list))
2087                 return;
2088
2089         /* Still nothing to do?
2090          * Maybe we still need to close the current epoch,
2091          * even if no new requests are queued yet.
2092          *
2093          * Also, poke TCP, just in case.
2094          * Then wait for new work (or signal). */
2095         rcu_read_lock();
2096         nc = rcu_dereference(connection->net_conf);
2097         uncork = nc ? nc->tcp_cork : 0;
2098         rcu_read_unlock();
2099         if (uncork) {
2100                 mutex_lock(&connection->data.mutex);
2101                 if (connection->data.socket)
2102                         tcp_sock_set_cork(connection->data.socket->sk, false);
2103                 mutex_unlock(&connection->data.mutex);
2104         }
2105
2106         for (;;) {
2107                 int send_barrier;
2108                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2109                 spin_lock_irq(&connection->resource->req_lock);
2110                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2111                 if (!list_empty(&connection->sender_work.q))
2112                         list_splice_tail_init(&connection->sender_work.q, work_list);
2113                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2114                 if (!list_empty(work_list) || signal_pending(current)) {
2115                         spin_unlock_irq(&connection->resource->req_lock);
2116                         break;
2117                 }
2118
2119                 /* We found nothing new to do, no to-be-communicated request,
2120                  * no other work item.  We may still need to close the last
2121                  * epoch.  Next incoming request epoch will be connection ->
2122                  * current transfer log epoch number.  If that is different
2123                  * from the epoch of the last request we communicated, it is
2124                  * safe to send the epoch separating barrier now.
2125                  */
2126                 send_barrier =
2127                         atomic_read(&connection->current_tle_nr) !=
2128                         connection->send.current_epoch_nr;
2129                 spin_unlock_irq(&connection->resource->req_lock);
2130
2131                 if (send_barrier)
2132                         maybe_send_barrier(connection,
2133                                         connection->send.current_epoch_nr + 1);
2134
2135                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2136                         break;
2137
2138                 /* drbd_send() may have called flush_signals() */
2139                 if (get_t_state(&connection->worker) != RUNNING)
2140                         break;
2141
2142                 schedule();
2143                 /* may be woken up for other things but new work, too,
2144                  * e.g. if the current epoch got closed.
2145                  * In which case we send the barrier above. */
2146         }
2147         finish_wait(&connection->sender_work.q_wait, &wait);
2148
2149         /* someone may have changed the config while we have been waiting above. */
2150         rcu_read_lock();
2151         nc = rcu_dereference(connection->net_conf);
2152         cork = nc ? nc->tcp_cork : 0;
2153         rcu_read_unlock();
2154         mutex_lock(&connection->data.mutex);
2155         if (connection->data.socket) {
2156                 if (cork)
2157                         tcp_sock_set_cork(connection->data.socket->sk, true);
2158                 else if (!uncork)
2159                         tcp_sock_set_cork(connection->data.socket->sk, false);
2160         }
2161         mutex_unlock(&connection->data.mutex);
2162 }
2163
2164 int drbd_worker(struct drbd_thread *thi)
2165 {
2166         struct drbd_connection *connection = thi->connection;
2167         struct drbd_work *w = NULL;
2168         struct drbd_peer_device *peer_device;
2169         LIST_HEAD(work_list);
2170         int vnr;
2171
2172         while (get_t_state(thi) == RUNNING) {
2173                 drbd_thread_current_set_cpu(thi);
2174
2175                 if (list_empty(&work_list)) {
2176                         update_worker_timing_details(connection, wait_for_work);
2177                         wait_for_work(connection, &work_list);
2178                 }
2179
2180                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2181                         update_worker_timing_details(connection, do_unqueued_work);
2182                         do_unqueued_work(connection);
2183                 }
2184
2185                 if (signal_pending(current)) {
2186                         flush_signals(current);
2187                         if (get_t_state(thi) == RUNNING) {
2188                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2189                                 continue;
2190                         }
2191                         break;
2192                 }
2193
2194                 if (get_t_state(thi) != RUNNING)
2195                         break;
2196
2197                 if (!list_empty(&work_list)) {
2198                         w = list_first_entry(&work_list, struct drbd_work, list);
2199                         list_del_init(&w->list);
2200                         update_worker_timing_details(connection, w->cb);
2201                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2202                                 continue;
2203                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2204                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2205                 }
2206         }
2207
2208         do {
2209                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2210                         update_worker_timing_details(connection, do_unqueued_work);
2211                         do_unqueued_work(connection);
2212                 }
2213                 if (!list_empty(&work_list)) {
2214                         w = list_first_entry(&work_list, struct drbd_work, list);
2215                         list_del_init(&w->list);
2216                         update_worker_timing_details(connection, w->cb);
2217                         w->cb(w, 1);
2218                 } else
2219                         dequeue_work_batch(&connection->sender_work, &work_list);
2220         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2221
2222         rcu_read_lock();
2223         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2224                 struct drbd_device *device = peer_device->device;
2225                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2226                 kref_get(&device->kref);
2227                 rcu_read_unlock();
2228                 drbd_device_cleanup(device);
2229                 kref_put(&device->kref, drbd_destroy_device);
2230                 rcu_read_lock();
2231         }
2232         rcu_read_unlock();
2233
2234         return 0;
2235 }