Merge tag 'docs-6.9' of git://git.lwn.net/linux
[linux-2.6-microblaze.git] / drivers / block / drbd / drbd_worker.c
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3    drbd_worker.c
4
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11
12 */
13
14 #include <linux/module.h>
15 #include <linux/drbd.h>
16 #include <linux/sched/signal.h>
17 #include <linux/wait.h>
18 #include <linux/mm.h>
19 #include <linux/memcontrol.h>
20 #include <linux/mm_inline.h>
21 #include <linux/slab.h>
22 #include <linux/random.h>
23 #include <linux/string.h>
24 #include <linux/scatterlist.h>
25 #include <linux/part_stat.h>
26
27 #include "drbd_int.h"
28 #include "drbd_protocol.h"
29 #include "drbd_req.h"
30
31 static int make_ov_request(struct drbd_peer_device *, int);
32 static int make_resync_request(struct drbd_peer_device *, int);
33
34 /* endio handlers:
35  *   drbd_md_endio (defined here)
36  *   drbd_request_endio (defined here)
37  *   drbd_peer_request_endio (defined here)
38  *   drbd_bm_endio (defined in drbd_bitmap.c)
39  *
40  * For all these callbacks, note the following:
41  * The callbacks will be called in irq context by the IDE drivers,
42  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
43  * Try to get the locking right :)
44  *
45  */
46
47 /* used for synchronous meta data and bitmap IO
48  * submitted by drbd_md_sync_page_io()
49  */
50 void drbd_md_endio(struct bio *bio)
51 {
52         struct drbd_device *device;
53
54         device = bio->bi_private;
55         device->md_io.error = blk_status_to_errno(bio->bi_status);
56
57         /* special case: drbd_md_read() during drbd_adm_attach() */
58         if (device->ldev)
59                 put_ldev(device);
60         bio_put(bio);
61
62         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
63          * to timeout on the lower level device, and eventually detach from it.
64          * If this io completion runs after that timeout expired, this
65          * drbd_md_put_buffer() may allow us to finally try and re-attach.
66          * During normal operation, this only puts that extra reference
67          * down to 1 again.
68          * Make sure we first drop the reference, and only then signal
69          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
70          * next drbd_md_sync_page_io(), that we trigger the
71          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
72          */
73         drbd_md_put_buffer(device);
74         device->md_io.done = 1;
75         wake_up(&device->misc_wait);
76 }
77
78 /* reads on behalf of the partner,
79  * "submitted" by the receiver
80  */
81 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
82 {
83         unsigned long flags = 0;
84         struct drbd_peer_device *peer_device = peer_req->peer_device;
85         struct drbd_device *device = peer_device->device;
86
87         spin_lock_irqsave(&device->resource->req_lock, flags);
88         device->read_cnt += peer_req->i.size >> 9;
89         list_del(&peer_req->w.list);
90         if (list_empty(&device->read_ee))
91                 wake_up(&device->ee_wait);
92         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
93                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
94         spin_unlock_irqrestore(&device->resource->req_lock, flags);
95
96         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
97         put_ldev(device);
98 }
99
100 /* writes on behalf of the partner, or resync writes,
101  * "submitted" by the receiver, final stage.  */
102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
103 {
104         unsigned long flags = 0;
105         struct drbd_peer_device *peer_device = peer_req->peer_device;
106         struct drbd_device *device = peer_device->device;
107         struct drbd_connection *connection = peer_device->connection;
108         struct drbd_interval i;
109         int do_wake;
110         u64 block_id;
111         int do_al_complete_io;
112
113         /* after we moved peer_req to done_ee,
114          * we may no longer access it,
115          * it may be freed/reused already!
116          * (as soon as we release the req_lock) */
117         i = peer_req->i;
118         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
119         block_id = peer_req->block_id;
120         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
121
122         if (peer_req->flags & EE_WAS_ERROR) {
123                 /* In protocol != C, we usually do not send write acks.
124                  * In case of a write error, send the neg ack anyways. */
125                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
126                         inc_unacked(device);
127                 drbd_set_out_of_sync(peer_device, peer_req->i.sector, peer_req->i.size);
128         }
129
130         spin_lock_irqsave(&device->resource->req_lock, flags);
131         device->writ_cnt += peer_req->i.size >> 9;
132         list_move_tail(&peer_req->w.list, &device->done_ee);
133
134         /*
135          * Do not remove from the write_requests tree here: we did not send the
136          * Ack yet and did not wake possibly waiting conflicting requests.
137          * Removed from the tree from "drbd_process_done_ee" within the
138          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
139          * _drbd_clear_done_ee.
140          */
141
142         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
143
144         /* FIXME do we want to detach for failed REQ_OP_DISCARD?
145          * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
146         if (peer_req->flags & EE_WAS_ERROR)
147                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
148
149         if (connection->cstate >= C_WF_REPORT_PARAMS) {
150                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
151                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
152                         kref_put(&device->kref, drbd_destroy_device);
153         }
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         put_ldev(device);
166 }
167
168 /* writes on behalf of the partner, or resync writes,
169  * "submitted" by the receiver.
170  */
171 void drbd_peer_request_endio(struct bio *bio)
172 {
173         struct drbd_peer_request *peer_req = bio->bi_private;
174         struct drbd_device *device = peer_req->peer_device->device;
175         bool is_write = bio_data_dir(bio) == WRITE;
176         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
177                           bio_op(bio) == REQ_OP_DISCARD;
178
179         if (bio->bi_status && drbd_ratelimit())
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_status,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_status)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 static void
198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 {
200         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
201                 device->minor, device->resource->name, device->vnr);
202 }
203
204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205  */
206 void drbd_request_endio(struct bio *bio)
207 {
208         unsigned long flags;
209         struct drbd_request *req = bio->bi_private;
210         struct drbd_device *device = req->device;
211         struct bio_and_error m;
212         enum drbd_req_event what;
213
214         /* If this request was aborted locally before,
215          * but now was completed "successfully",
216          * chances are that this caused arbitrary data corruption.
217          *
218          * "aborting" requests, or force-detaching the disk, is intended for
219          * completely blocked/hung local backing devices which do no longer
220          * complete requests at all, not even do error completions.  In this
221          * situation, usually a hard-reset and failover is the only way out.
222          *
223          * By "aborting", basically faking a local error-completion,
224          * we allow for a more graceful swichover by cleanly migrating services.
225          * Still the affected node has to be rebooted "soon".
226          *
227          * By completing these requests, we allow the upper layers to re-use
228          * the associated data pages.
229          *
230          * If later the local backing device "recovers", and now DMAs some data
231          * from disk into the original request pages, in the best case it will
232          * just put random data into unused pages; but typically it will corrupt
233          * meanwhile completely unrelated data, causing all sorts of damage.
234          *
235          * Which means delayed successful completion,
236          * especially for READ requests,
237          * is a reason to panic().
238          *
239          * We assume that a delayed *error* completion is OK,
240          * though we still will complain noisily about it.
241          */
242         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
243                 if (drbd_ratelimit())
244                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
245
246                 if (!bio->bi_status)
247                         drbd_panic_after_delayed_completion_of_aborted_request(device);
248         }
249
250         /* to avoid recursion in __req_mod */
251         if (unlikely(bio->bi_status)) {
252                 switch (bio_op(bio)) {
253                 case REQ_OP_WRITE_ZEROES:
254                 case REQ_OP_DISCARD:
255                         if (bio->bi_status == BLK_STS_NOTSUPP)
256                                 what = DISCARD_COMPLETED_NOTSUPP;
257                         else
258                                 what = DISCARD_COMPLETED_WITH_ERROR;
259                         break;
260                 case REQ_OP_READ:
261                         if (bio->bi_opf & REQ_RAHEAD)
262                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
263                         else
264                                 what = READ_COMPLETED_WITH_ERROR;
265                         break;
266                 default:
267                         what = WRITE_COMPLETED_WITH_ERROR;
268                         break;
269                 }
270         } else {
271                 what = COMPLETED_OK;
272         }
273
274         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
275         bio_put(bio);
276
277         /* not req_mod(), we need irqsave here! */
278         spin_lock_irqsave(&device->resource->req_lock, flags);
279         __req_mod(req, what, NULL, &m);
280         spin_unlock_irqrestore(&device->resource->req_lock, flags);
281         put_ldev(device);
282
283         if (m.bio)
284                 complete_master_bio(device, &m);
285 }
286
287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
288 {
289         SHASH_DESC_ON_STACK(desc, tfm);
290         struct page *page = peer_req->pages;
291         struct page *tmp;
292         unsigned len;
293         void *src;
294
295         desc->tfm = tfm;
296
297         crypto_shash_init(desc);
298
299         src = kmap_atomic(page);
300         while ((tmp = page_chain_next(page))) {
301                 /* all but the last page will be fully used */
302                 crypto_shash_update(desc, src, PAGE_SIZE);
303                 kunmap_atomic(src);
304                 page = tmp;
305                 src = kmap_atomic(page);
306         }
307         /* and now the last, possibly only partially used page */
308         len = peer_req->i.size & (PAGE_SIZE - 1);
309         crypto_shash_update(desc, src, len ?: PAGE_SIZE);
310         kunmap_atomic(src);
311
312         crypto_shash_final(desc, digest);
313         shash_desc_zero(desc);
314 }
315
316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
317 {
318         SHASH_DESC_ON_STACK(desc, tfm);
319         struct bio_vec bvec;
320         struct bvec_iter iter;
321
322         desc->tfm = tfm;
323
324         crypto_shash_init(desc);
325
326         bio_for_each_segment(bvec, bio, iter) {
327                 u8 *src;
328
329                 src = bvec_kmap_local(&bvec);
330                 crypto_shash_update(desc, src, bvec.bv_len);
331                 kunmap_local(src);
332         }
333         crypto_shash_final(desc, digest);
334         shash_desc_zero(desc);
335 }
336
337 /* MAYBE merge common code with w_e_end_ov_req */
338 static int w_e_send_csum(struct drbd_work *w, int cancel)
339 {
340         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
341         struct drbd_peer_device *peer_device = peer_req->peer_device;
342         struct drbd_device *device = peer_device->device;
343         int digest_size;
344         void *digest;
345         int err = 0;
346
347         if (unlikely(cancel))
348                 goto out;
349
350         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
351                 goto out;
352
353         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
354         digest = kmalloc(digest_size, GFP_NOIO);
355         if (digest) {
356                 sector_t sector = peer_req->i.sector;
357                 unsigned int size = peer_req->i.size;
358                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
359                 /* Free peer_req and pages before send.
360                  * In case we block on congestion, we could otherwise run into
361                  * some distributed deadlock, if the other side blocks on
362                  * congestion as well, because our receiver blocks in
363                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
364                 drbd_free_peer_req(device, peer_req);
365                 peer_req = NULL;
366                 inc_rs_pending(peer_device);
367                 err = drbd_send_drequest_csum(peer_device, sector, size,
368                                               digest, digest_size,
369                                               P_CSUM_RS_REQUEST);
370                 kfree(digest);
371         } else {
372                 drbd_err(device, "kmalloc() of digest failed.\n");
373                 err = -ENOMEM;
374         }
375
376 out:
377         if (peer_req)
378                 drbd_free_peer_req(device, peer_req);
379
380         if (unlikely(err))
381                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
382         return err;
383 }
384
385 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
386
387 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
388 {
389         struct drbd_device *device = peer_device->device;
390         struct drbd_peer_request *peer_req;
391
392         if (!get_ldev(device))
393                 return -EIO;
394
395         /* GFP_TRY, because if there is no memory available right now, this may
396          * be rescheduled for later. It is "only" background resync, after all. */
397         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
398                                        size, size, GFP_TRY);
399         if (!peer_req)
400                 goto defer;
401
402         peer_req->w.cb = w_e_send_csum;
403         peer_req->opf = REQ_OP_READ;
404         spin_lock_irq(&device->resource->req_lock);
405         list_add_tail(&peer_req->w.list, &device->read_ee);
406         spin_unlock_irq(&device->resource->req_lock);
407
408         atomic_add(size >> 9, &device->rs_sect_ev);
409         if (drbd_submit_peer_request(peer_req) == 0)
410                 return 0;
411
412         /* If it failed because of ENOMEM, retry should help.  If it failed
413          * because bio_add_page failed (probably broken lower level driver),
414          * retry may or may not help.
415          * If it does not, you may need to force disconnect. */
416         spin_lock_irq(&device->resource->req_lock);
417         list_del(&peer_req->w.list);
418         spin_unlock_irq(&device->resource->req_lock);
419
420         drbd_free_peer_req(device, peer_req);
421 defer:
422         put_ldev(device);
423         return -EAGAIN;
424 }
425
426 int w_resync_timer(struct drbd_work *w, int cancel)
427 {
428         struct drbd_device *device =
429                 container_of(w, struct drbd_device, resync_work);
430
431         switch (device->state.conn) {
432         case C_VERIFY_S:
433                 make_ov_request(first_peer_device(device), cancel);
434                 break;
435         case C_SYNC_TARGET:
436                 make_resync_request(first_peer_device(device), cancel);
437                 break;
438         }
439
440         return 0;
441 }
442
443 void resync_timer_fn(struct timer_list *t)
444 {
445         struct drbd_device *device = from_timer(device, t, resync_timer);
446
447         drbd_queue_work_if_unqueued(
448                 &first_peer_device(device)->connection->sender_work,
449                 &device->resync_work);
450 }
451
452 static void fifo_set(struct fifo_buffer *fb, int value)
453 {
454         int i;
455
456         for (i = 0; i < fb->size; i++)
457                 fb->values[i] = value;
458 }
459
460 static int fifo_push(struct fifo_buffer *fb, int value)
461 {
462         int ov;
463
464         ov = fb->values[fb->head_index];
465         fb->values[fb->head_index++] = value;
466
467         if (fb->head_index >= fb->size)
468                 fb->head_index = 0;
469
470         return ov;
471 }
472
473 static void fifo_add_val(struct fifo_buffer *fb, int value)
474 {
475         int i;
476
477         for (i = 0; i < fb->size; i++)
478                 fb->values[i] += value;
479 }
480
481 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
482 {
483         struct fifo_buffer *fb;
484
485         fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
486         if (!fb)
487                 return NULL;
488
489         fb->head_index = 0;
490         fb->size = fifo_size;
491         fb->total = 0;
492
493         return fb;
494 }
495
496 static int drbd_rs_controller(struct drbd_peer_device *peer_device, unsigned int sect_in)
497 {
498         struct drbd_device *device = peer_device->device;
499         struct disk_conf *dc;
500         unsigned int want;     /* The number of sectors we want in-flight */
501         int req_sect; /* Number of sectors to request in this turn */
502         int correction; /* Number of sectors more we need in-flight */
503         int cps; /* correction per invocation of drbd_rs_controller() */
504         int steps; /* Number of time steps to plan ahead */
505         int curr_corr;
506         int max_sect;
507         struct fifo_buffer *plan;
508
509         dc = rcu_dereference(device->ldev->disk_conf);
510         plan = rcu_dereference(device->rs_plan_s);
511
512         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
513
514         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
515                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
516         } else { /* normal path */
517                 want = dc->c_fill_target ? dc->c_fill_target :
518                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
519         }
520
521         correction = want - device->rs_in_flight - plan->total;
522
523         /* Plan ahead */
524         cps = correction / steps;
525         fifo_add_val(plan, cps);
526         plan->total += cps * steps;
527
528         /* What we do in this step */
529         curr_corr = fifo_push(plan, 0);
530         plan->total -= curr_corr;
531
532         req_sect = sect_in + curr_corr;
533         if (req_sect < 0)
534                 req_sect = 0;
535
536         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
537         if (req_sect > max_sect)
538                 req_sect = max_sect;
539
540         /*
541         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
542                  sect_in, device->rs_in_flight, want, correction,
543                  steps, cps, device->rs_planed, curr_corr, req_sect);
544         */
545
546         return req_sect;
547 }
548
549 static int drbd_rs_number_requests(struct drbd_peer_device *peer_device)
550 {
551         struct drbd_device *device = peer_device->device;
552         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
553         int number, mxb;
554
555         sect_in = atomic_xchg(&device->rs_sect_in, 0);
556         device->rs_in_flight -= sect_in;
557
558         rcu_read_lock();
559         mxb = drbd_get_max_buffers(device) / 2;
560         if (rcu_dereference(device->rs_plan_s)->size) {
561                 number = drbd_rs_controller(peer_device, sect_in) >> (BM_BLOCK_SHIFT - 9);
562                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
563         } else {
564                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
565                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
566         }
567         rcu_read_unlock();
568
569         /* Don't have more than "max-buffers"/2 in-flight.
570          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
571          * potentially causing a distributed deadlock on congestion during
572          * online-verify or (checksum-based) resync, if max-buffers,
573          * socket buffer sizes and resync rate settings are mis-configured. */
574
575         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
576          * mxb (as used here, and in drbd_alloc_pages on the peer) is
577          * "number of pages" (typically also 4k),
578          * but "rs_in_flight" is in "sectors" (512 Byte). */
579         if (mxb - device->rs_in_flight/8 < number)
580                 number = mxb - device->rs_in_flight/8;
581
582         return number;
583 }
584
585 static int make_resync_request(struct drbd_peer_device *const peer_device, int cancel)
586 {
587         struct drbd_device *const device = peer_device->device;
588         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
589         unsigned long bit;
590         sector_t sector;
591         const sector_t capacity = get_capacity(device->vdisk);
592         int max_bio_size;
593         int number, rollback_i, size;
594         int align, requeue = 0;
595         int i = 0;
596         int discard_granularity = 0;
597
598         if (unlikely(cancel))
599                 return 0;
600
601         if (device->rs_total == 0) {
602                 /* empty resync? */
603                 drbd_resync_finished(peer_device);
604                 return 0;
605         }
606
607         if (!get_ldev(device)) {
608                 /* Since we only need to access device->rsync a
609                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
610                    to continue resync with a broken disk makes no sense at
611                    all */
612                 drbd_err(device, "Disk broke down during resync!\n");
613                 return 0;
614         }
615
616         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
617                 rcu_read_lock();
618                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
619                 rcu_read_unlock();
620         }
621
622         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
623         number = drbd_rs_number_requests(peer_device);
624         if (number <= 0)
625                 goto requeue;
626
627         for (i = 0; i < number; i++) {
628                 /* Stop generating RS requests when half of the send buffer is filled,
629                  * but notify TCP that we'd like to have more space. */
630                 mutex_lock(&connection->data.mutex);
631                 if (connection->data.socket) {
632                         struct sock *sk = connection->data.socket->sk;
633                         int queued = sk->sk_wmem_queued;
634                         int sndbuf = sk->sk_sndbuf;
635                         if (queued > sndbuf / 2) {
636                                 requeue = 1;
637                                 if (sk->sk_socket)
638                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
639                         }
640                 } else
641                         requeue = 1;
642                 mutex_unlock(&connection->data.mutex);
643                 if (requeue)
644                         goto requeue;
645
646 next_sector:
647                 size = BM_BLOCK_SIZE;
648                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
649
650                 if (bit == DRBD_END_OF_BITMAP) {
651                         device->bm_resync_fo = drbd_bm_bits(device);
652                         put_ldev(device);
653                         return 0;
654                 }
655
656                 sector = BM_BIT_TO_SECT(bit);
657
658                 if (drbd_try_rs_begin_io(peer_device, sector)) {
659                         device->bm_resync_fo = bit;
660                         goto requeue;
661                 }
662                 device->bm_resync_fo = bit + 1;
663
664                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
665                         drbd_rs_complete_io(device, sector);
666                         goto next_sector;
667                 }
668
669 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
670                 /* try to find some adjacent bits.
671                  * we stop if we have already the maximum req size.
672                  *
673                  * Additionally always align bigger requests, in order to
674                  * be prepared for all stripe sizes of software RAIDs.
675                  */
676                 align = 1;
677                 rollback_i = i;
678                 while (i < number) {
679                         if (size + BM_BLOCK_SIZE > max_bio_size)
680                                 break;
681
682                         /* Be always aligned */
683                         if (sector & ((1<<(align+3))-1))
684                                 break;
685
686                         if (discard_granularity && size == discard_granularity)
687                                 break;
688
689                         /* do not cross extent boundaries */
690                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
691                                 break;
692                         /* now, is it actually dirty, after all?
693                          * caution, drbd_bm_test_bit is tri-state for some
694                          * obscure reason; ( b == 0 ) would get the out-of-band
695                          * only accidentally right because of the "oddly sized"
696                          * adjustment below */
697                         if (drbd_bm_test_bit(device, bit+1) != 1)
698                                 break;
699                         bit++;
700                         size += BM_BLOCK_SIZE;
701                         if ((BM_BLOCK_SIZE << align) <= size)
702                                 align++;
703                         i++;
704                 }
705                 /* if we merged some,
706                  * reset the offset to start the next drbd_bm_find_next from */
707                 if (size > BM_BLOCK_SIZE)
708                         device->bm_resync_fo = bit + 1;
709 #endif
710
711                 /* adjust very last sectors, in case we are oddly sized */
712                 if (sector + (size>>9) > capacity)
713                         size = (capacity-sector)<<9;
714
715                 if (device->use_csums) {
716                         switch (read_for_csum(peer_device, sector, size)) {
717                         case -EIO: /* Disk failure */
718                                 put_ldev(device);
719                                 return -EIO;
720                         case -EAGAIN: /* allocation failed, or ldev busy */
721                                 drbd_rs_complete_io(device, sector);
722                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
723                                 i = rollback_i;
724                                 goto requeue;
725                         case 0:
726                                 /* everything ok */
727                                 break;
728                         default:
729                                 BUG();
730                         }
731                 } else {
732                         int err;
733
734                         inc_rs_pending(peer_device);
735                         err = drbd_send_drequest(peer_device,
736                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
737                                                  sector, size, ID_SYNCER);
738                         if (err) {
739                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
740                                 dec_rs_pending(peer_device);
741                                 put_ldev(device);
742                                 return err;
743                         }
744                 }
745         }
746
747         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
748                 /* last syncer _request_ was sent,
749                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
750                  * next sync group will resume), as soon as we receive the last
751                  * resync data block, and the last bit is cleared.
752                  * until then resync "work" is "inactive" ...
753                  */
754                 put_ldev(device);
755                 return 0;
756         }
757
758  requeue:
759         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
760         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
761         put_ldev(device);
762         return 0;
763 }
764
765 static int make_ov_request(struct drbd_peer_device *peer_device, int cancel)
766 {
767         struct drbd_device *device = peer_device->device;
768         int number, i, size;
769         sector_t sector;
770         const sector_t capacity = get_capacity(device->vdisk);
771         bool stop_sector_reached = false;
772
773         if (unlikely(cancel))
774                 return 1;
775
776         number = drbd_rs_number_requests(peer_device);
777
778         sector = device->ov_position;
779         for (i = 0; i < number; i++) {
780                 if (sector >= capacity)
781                         return 1;
782
783                 /* We check for "finished" only in the reply path:
784                  * w_e_end_ov_reply().
785                  * We need to send at least one request out. */
786                 stop_sector_reached = i > 0
787                         && verify_can_do_stop_sector(device)
788                         && sector >= device->ov_stop_sector;
789                 if (stop_sector_reached)
790                         break;
791
792                 size = BM_BLOCK_SIZE;
793
794                 if (drbd_try_rs_begin_io(peer_device, sector)) {
795                         device->ov_position = sector;
796                         goto requeue;
797                 }
798
799                 if (sector + (size>>9) > capacity)
800                         size = (capacity-sector)<<9;
801
802                 inc_rs_pending(peer_device);
803                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
804                         dec_rs_pending(peer_device);
805                         return 0;
806                 }
807                 sector += BM_SECT_PER_BIT;
808         }
809         device->ov_position = sector;
810
811  requeue:
812         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
813         if (i == 0 || !stop_sector_reached)
814                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
815         return 1;
816 }
817
818 int w_ov_finished(struct drbd_work *w, int cancel)
819 {
820         struct drbd_device_work *dw =
821                 container_of(w, struct drbd_device_work, w);
822         struct drbd_device *device = dw->device;
823         kfree(dw);
824         ov_out_of_sync_print(first_peer_device(device));
825         drbd_resync_finished(first_peer_device(device));
826
827         return 0;
828 }
829
830 static int w_resync_finished(struct drbd_work *w, int cancel)
831 {
832         struct drbd_device_work *dw =
833                 container_of(w, struct drbd_device_work, w);
834         struct drbd_device *device = dw->device;
835         kfree(dw);
836
837         drbd_resync_finished(first_peer_device(device));
838
839         return 0;
840 }
841
842 static void ping_peer(struct drbd_device *device)
843 {
844         struct drbd_connection *connection = first_peer_device(device)->connection;
845
846         clear_bit(GOT_PING_ACK, &connection->flags);
847         request_ping(connection);
848         wait_event(connection->ping_wait,
849                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
850 }
851
852 int drbd_resync_finished(struct drbd_peer_device *peer_device)
853 {
854         struct drbd_device *device = peer_device->device;
855         struct drbd_connection *connection = peer_device->connection;
856         unsigned long db, dt, dbdt;
857         unsigned long n_oos;
858         union drbd_state os, ns;
859         struct drbd_device_work *dw;
860         char *khelper_cmd = NULL;
861         int verify_done = 0;
862
863         /* Remove all elements from the resync LRU. Since future actions
864          * might set bits in the (main) bitmap, then the entries in the
865          * resync LRU would be wrong. */
866         if (drbd_rs_del_all(device)) {
867                 /* In case this is not possible now, most probably because
868                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
869                  * queue (or even the read operations for those packets
870                  * is not finished by now).   Retry in 100ms. */
871
872                 schedule_timeout_interruptible(HZ / 10);
873                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
874                 if (dw) {
875                         dw->w.cb = w_resync_finished;
876                         dw->device = device;
877                         drbd_queue_work(&connection->sender_work, &dw->w);
878                         return 1;
879                 }
880                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
881         }
882
883         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
884         if (dt <= 0)
885                 dt = 1;
886
887         db = device->rs_total;
888         /* adjust for verify start and stop sectors, respective reached position */
889         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
890                 db -= device->ov_left;
891
892         dbdt = Bit2KB(db/dt);
893         device->rs_paused /= HZ;
894
895         if (!get_ldev(device))
896                 goto out;
897
898         ping_peer(device);
899
900         spin_lock_irq(&device->resource->req_lock);
901         os = drbd_read_state(device);
902
903         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
904
905         /* This protects us against multiple calls (that can happen in the presence
906            of application IO), and against connectivity loss just before we arrive here. */
907         if (os.conn <= C_CONNECTED)
908                 goto out_unlock;
909
910         ns = os;
911         ns.conn = C_CONNECTED;
912
913         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
914              verify_done ? "Online verify" : "Resync",
915              dt + device->rs_paused, device->rs_paused, dbdt);
916
917         n_oos = drbd_bm_total_weight(device);
918
919         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
920                 if (n_oos) {
921                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
922                               n_oos, Bit2KB(1));
923                         khelper_cmd = "out-of-sync";
924                 }
925         } else {
926                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
927
928                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
929                         khelper_cmd = "after-resync-target";
930
931                 if (device->use_csums && device->rs_total) {
932                         const unsigned long s = device->rs_same_csum;
933                         const unsigned long t = device->rs_total;
934                         const int ratio =
935                                 (t == 0)     ? 0 :
936                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
937                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
938                              "transferred %luK total %luK\n",
939                              ratio,
940                              Bit2KB(device->rs_same_csum),
941                              Bit2KB(device->rs_total - device->rs_same_csum),
942                              Bit2KB(device->rs_total));
943                 }
944         }
945
946         if (device->rs_failed) {
947                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
948
949                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
950                         ns.disk = D_INCONSISTENT;
951                         ns.pdsk = D_UP_TO_DATE;
952                 } else {
953                         ns.disk = D_UP_TO_DATE;
954                         ns.pdsk = D_INCONSISTENT;
955                 }
956         } else {
957                 ns.disk = D_UP_TO_DATE;
958                 ns.pdsk = D_UP_TO_DATE;
959
960                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
961                         if (device->p_uuid) {
962                                 int i;
963                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
964                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
965                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
966                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
967                         } else {
968                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
969                         }
970                 }
971
972                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
973                         /* for verify runs, we don't update uuids here,
974                          * so there would be nothing to report. */
975                         drbd_uuid_set_bm(device, 0UL);
976                         drbd_print_uuids(device, "updated UUIDs");
977                         if (device->p_uuid) {
978                                 /* Now the two UUID sets are equal, update what we
979                                  * know of the peer. */
980                                 int i;
981                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
982                                         device->p_uuid[i] = device->ldev->md.uuid[i];
983                         }
984                 }
985         }
986
987         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
988 out_unlock:
989         spin_unlock_irq(&device->resource->req_lock);
990
991         /* If we have been sync source, and have an effective fencing-policy,
992          * once *all* volumes are back in sync, call "unfence". */
993         if (os.conn == C_SYNC_SOURCE) {
994                 enum drbd_disk_state disk_state = D_MASK;
995                 enum drbd_disk_state pdsk_state = D_MASK;
996                 enum drbd_fencing_p fp = FP_DONT_CARE;
997
998                 rcu_read_lock();
999                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1000                 if (fp != FP_DONT_CARE) {
1001                         struct drbd_peer_device *peer_device;
1002                         int vnr;
1003                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1004                                 struct drbd_device *device = peer_device->device;
1005                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1006                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1007                         }
1008                 }
1009                 rcu_read_unlock();
1010                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1011                         conn_khelper(connection, "unfence-peer");
1012         }
1013
1014         put_ldev(device);
1015 out:
1016         device->rs_total  = 0;
1017         device->rs_failed = 0;
1018         device->rs_paused = 0;
1019
1020         /* reset start sector, if we reached end of device */
1021         if (verify_done && device->ov_left == 0)
1022                 device->ov_start_sector = 0;
1023
1024         drbd_md_sync(device);
1025
1026         if (khelper_cmd)
1027                 drbd_khelper(device, khelper_cmd);
1028
1029         return 1;
1030 }
1031
1032 /* helper */
1033 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1034 {
1035         if (drbd_peer_req_has_active_page(peer_req)) {
1036                 /* This might happen if sendpage() has not finished */
1037                 int i = PFN_UP(peer_req->i.size);
1038                 atomic_add(i, &device->pp_in_use_by_net);
1039                 atomic_sub(i, &device->pp_in_use);
1040                 spin_lock_irq(&device->resource->req_lock);
1041                 list_add_tail(&peer_req->w.list, &device->net_ee);
1042                 spin_unlock_irq(&device->resource->req_lock);
1043                 wake_up(&drbd_pp_wait);
1044         } else
1045                 drbd_free_peer_req(device, peer_req);
1046 }
1047
1048 /**
1049  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1050  * @w:          work object.
1051  * @cancel:     The connection will be closed anyways
1052  */
1053 int w_e_end_data_req(struct drbd_work *w, int cancel)
1054 {
1055         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1056         struct drbd_peer_device *peer_device = peer_req->peer_device;
1057         struct drbd_device *device = peer_device->device;
1058         int err;
1059
1060         if (unlikely(cancel)) {
1061                 drbd_free_peer_req(device, peer_req);
1062                 dec_unacked(device);
1063                 return 0;
1064         }
1065
1066         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1067                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1068         } else {
1069                 if (drbd_ratelimit())
1070                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1071                             (unsigned long long)peer_req->i.sector);
1072
1073                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1074         }
1075
1076         dec_unacked(device);
1077
1078         move_to_net_ee_or_free(device, peer_req);
1079
1080         if (unlikely(err))
1081                 drbd_err(device, "drbd_send_block() failed\n");
1082         return err;
1083 }
1084
1085 static bool all_zero(struct drbd_peer_request *peer_req)
1086 {
1087         struct page *page = peer_req->pages;
1088         unsigned int len = peer_req->i.size;
1089
1090         page_chain_for_each(page) {
1091                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1092                 unsigned int i, words = l / sizeof(long);
1093                 unsigned long *d;
1094
1095                 d = kmap_atomic(page);
1096                 for (i = 0; i < words; i++) {
1097                         if (d[i]) {
1098                                 kunmap_atomic(d);
1099                                 return false;
1100                         }
1101                 }
1102                 kunmap_atomic(d);
1103                 len -= l;
1104         }
1105
1106         return true;
1107 }
1108
1109 /**
1110  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1111  * @w:          work object.
1112  * @cancel:     The connection will be closed anyways
1113  */
1114 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1115 {
1116         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1117         struct drbd_peer_device *peer_device = peer_req->peer_device;
1118         struct drbd_device *device = peer_device->device;
1119         int err;
1120
1121         if (unlikely(cancel)) {
1122                 drbd_free_peer_req(device, peer_req);
1123                 dec_unacked(device);
1124                 return 0;
1125         }
1126
1127         if (get_ldev_if_state(device, D_FAILED)) {
1128                 drbd_rs_complete_io(device, peer_req->i.sector);
1129                 put_ldev(device);
1130         }
1131
1132         if (device->state.conn == C_AHEAD) {
1133                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1134         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1135                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1136                         inc_rs_pending(peer_device);
1137                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1138                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1139                         else
1140                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1141                 } else {
1142                         if (drbd_ratelimit())
1143                                 drbd_err(device, "Not sending RSDataReply, "
1144                                     "partner DISKLESS!\n");
1145                         err = 0;
1146                 }
1147         } else {
1148                 if (drbd_ratelimit())
1149                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1150                             (unsigned long long)peer_req->i.sector);
1151
1152                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1153
1154                 /* update resync data with failure */
1155                 drbd_rs_failed_io(peer_device, peer_req->i.sector, peer_req->i.size);
1156         }
1157
1158         dec_unacked(device);
1159
1160         move_to_net_ee_or_free(device, peer_req);
1161
1162         if (unlikely(err))
1163                 drbd_err(device, "drbd_send_block() failed\n");
1164         return err;
1165 }
1166
1167 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1168 {
1169         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1170         struct drbd_peer_device *peer_device = peer_req->peer_device;
1171         struct drbd_device *device = peer_device->device;
1172         struct digest_info *di;
1173         int digest_size;
1174         void *digest = NULL;
1175         int err, eq = 0;
1176
1177         if (unlikely(cancel)) {
1178                 drbd_free_peer_req(device, peer_req);
1179                 dec_unacked(device);
1180                 return 0;
1181         }
1182
1183         if (get_ldev(device)) {
1184                 drbd_rs_complete_io(device, peer_req->i.sector);
1185                 put_ldev(device);
1186         }
1187
1188         di = peer_req->digest;
1189
1190         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1191                 /* quick hack to try to avoid a race against reconfiguration.
1192                  * a real fix would be much more involved,
1193                  * introducing more locking mechanisms */
1194                 if (peer_device->connection->csums_tfm) {
1195                         digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1196                         D_ASSERT(device, digest_size == di->digest_size);
1197                         digest = kmalloc(digest_size, GFP_NOIO);
1198                 }
1199                 if (digest) {
1200                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1201                         eq = !memcmp(digest, di->digest, digest_size);
1202                         kfree(digest);
1203                 }
1204
1205                 if (eq) {
1206                         drbd_set_in_sync(peer_device, peer_req->i.sector, peer_req->i.size);
1207                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1208                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1209                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1210                 } else {
1211                         inc_rs_pending(peer_device);
1212                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1213                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1214                         kfree(di);
1215                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1216                 }
1217         } else {
1218                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1219                 if (drbd_ratelimit())
1220                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1221         }
1222
1223         dec_unacked(device);
1224         move_to_net_ee_or_free(device, peer_req);
1225
1226         if (unlikely(err))
1227                 drbd_err(device, "drbd_send_block/ack() failed\n");
1228         return err;
1229 }
1230
1231 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1232 {
1233         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1234         struct drbd_peer_device *peer_device = peer_req->peer_device;
1235         struct drbd_device *device = peer_device->device;
1236         sector_t sector = peer_req->i.sector;
1237         unsigned int size = peer_req->i.size;
1238         int digest_size;
1239         void *digest;
1240         int err = 0;
1241
1242         if (unlikely(cancel))
1243                 goto out;
1244
1245         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1246         digest = kmalloc(digest_size, GFP_NOIO);
1247         if (!digest) {
1248                 err = 1;        /* terminate the connection in case the allocation failed */
1249                 goto out;
1250         }
1251
1252         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1253                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254         else
1255                 memset(digest, 0, digest_size);
1256
1257         /* Free e and pages before send.
1258          * In case we block on congestion, we could otherwise run into
1259          * some distributed deadlock, if the other side blocks on
1260          * congestion as well, because our receiver blocks in
1261          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1262         drbd_free_peer_req(device, peer_req);
1263         peer_req = NULL;
1264         inc_rs_pending(peer_device);
1265         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1266         if (err)
1267                 dec_rs_pending(peer_device);
1268         kfree(digest);
1269
1270 out:
1271         if (peer_req)
1272                 drbd_free_peer_req(device, peer_req);
1273         dec_unacked(device);
1274         return err;
1275 }
1276
1277 void drbd_ov_out_of_sync_found(struct drbd_peer_device *peer_device, sector_t sector, int size)
1278 {
1279         struct drbd_device *device = peer_device->device;
1280         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281                 device->ov_last_oos_size += size>>9;
1282         } else {
1283                 device->ov_last_oos_start = sector;
1284                 device->ov_last_oos_size = size>>9;
1285         }
1286         drbd_set_out_of_sync(peer_device, sector, size);
1287 }
1288
1289 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290 {
1291         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292         struct drbd_peer_device *peer_device = peer_req->peer_device;
1293         struct drbd_device *device = peer_device->device;
1294         struct digest_info *di;
1295         void *digest;
1296         sector_t sector = peer_req->i.sector;
1297         unsigned int size = peer_req->i.size;
1298         int digest_size;
1299         int err, eq = 0;
1300         bool stop_sector_reached = false;
1301
1302         if (unlikely(cancel)) {
1303                 drbd_free_peer_req(device, peer_req);
1304                 dec_unacked(device);
1305                 return 0;
1306         }
1307
1308         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309          * the resync lru has been cleaned up already */
1310         if (get_ldev(device)) {
1311                 drbd_rs_complete_io(device, peer_req->i.sector);
1312                 put_ldev(device);
1313         }
1314
1315         di = peer_req->digest;
1316
1317         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318                 digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319                 digest = kmalloc(digest_size, GFP_NOIO);
1320                 if (digest) {
1321                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323                         D_ASSERT(device, digest_size == di->digest_size);
1324                         eq = !memcmp(digest, di->digest, digest_size);
1325                         kfree(digest);
1326                 }
1327         }
1328
1329         /* Free peer_req and pages before send.
1330          * In case we block on congestion, we could otherwise run into
1331          * some distributed deadlock, if the other side blocks on
1332          * congestion as well, because our receiver blocks in
1333          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334         drbd_free_peer_req(device, peer_req);
1335         if (!eq)
1336                 drbd_ov_out_of_sync_found(peer_device, sector, size);
1337         else
1338                 ov_out_of_sync_print(peer_device);
1339
1340         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343         dec_unacked(device);
1344
1345         --device->ov_left;
1346
1347         /* let's advance progress step marks only for every other megabyte */
1348         if ((device->ov_left & 0x200) == 0x200)
1349                 drbd_advance_rs_marks(peer_device, device->ov_left);
1350
1351         stop_sector_reached = verify_can_do_stop_sector(device) &&
1352                 (sector + (size>>9)) >= device->ov_stop_sector;
1353
1354         if (device->ov_left == 0 || stop_sector_reached) {
1355                 ov_out_of_sync_print(peer_device);
1356                 drbd_resync_finished(peer_device);
1357         }
1358
1359         return err;
1360 }
1361
1362 /* FIXME
1363  * We need to track the number of pending barrier acks,
1364  * and to be able to wait for them.
1365  * See also comment in drbd_adm_attach before drbd_suspend_io.
1366  */
1367 static int drbd_send_barrier(struct drbd_connection *connection)
1368 {
1369         struct p_barrier *p;
1370         struct drbd_socket *sock;
1371
1372         sock = &connection->data;
1373         p = conn_prepare_command(connection, sock);
1374         if (!p)
1375                 return -EIO;
1376         p->barrier = connection->send.current_epoch_nr;
1377         p->pad = 0;
1378         connection->send.current_epoch_writes = 0;
1379         connection->send.last_sent_barrier_jif = jiffies;
1380
1381         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382 }
1383
1384 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385 {
1386         struct drbd_socket *sock = &pd->connection->data;
1387         if (!drbd_prepare_command(pd, sock))
1388                 return -EIO;
1389         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390 }
1391
1392 int w_send_write_hint(struct drbd_work *w, int cancel)
1393 {
1394         struct drbd_device *device =
1395                 container_of(w, struct drbd_device, unplug_work);
1396
1397         if (cancel)
1398                 return 0;
1399         return pd_send_unplug_remote(first_peer_device(device));
1400 }
1401
1402 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403 {
1404         if (!connection->send.seen_any_write_yet) {
1405                 connection->send.seen_any_write_yet = true;
1406                 connection->send.current_epoch_nr = epoch;
1407                 connection->send.current_epoch_writes = 0;
1408                 connection->send.last_sent_barrier_jif = jiffies;
1409         }
1410 }
1411
1412 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413 {
1414         /* re-init if first write on this connection */
1415         if (!connection->send.seen_any_write_yet)
1416                 return;
1417         if (connection->send.current_epoch_nr != epoch) {
1418                 if (connection->send.current_epoch_writes)
1419                         drbd_send_barrier(connection);
1420                 connection->send.current_epoch_nr = epoch;
1421         }
1422 }
1423
1424 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425 {
1426         struct drbd_request *req = container_of(w, struct drbd_request, w);
1427         struct drbd_device *device = req->device;
1428         struct drbd_peer_device *const peer_device = first_peer_device(device);
1429         struct drbd_connection *const connection = peer_device->connection;
1430         int err;
1431
1432         if (unlikely(cancel)) {
1433                 req_mod(req, SEND_CANCELED, peer_device);
1434                 return 0;
1435         }
1436         req->pre_send_jif = jiffies;
1437
1438         /* this time, no connection->send.current_epoch_writes++;
1439          * If it was sent, it was the closing barrier for the last
1440          * replicated epoch, before we went into AHEAD mode.
1441          * No more barriers will be sent, until we leave AHEAD mode again. */
1442         maybe_send_barrier(connection, req->epoch);
1443
1444         err = drbd_send_out_of_sync(peer_device, req);
1445         req_mod(req, OOS_HANDED_TO_NETWORK, peer_device);
1446
1447         return err;
1448 }
1449
1450 /**
1451  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452  * @w:          work object.
1453  * @cancel:     The connection will be closed anyways
1454  */
1455 int w_send_dblock(struct drbd_work *w, int cancel)
1456 {
1457         struct drbd_request *req = container_of(w, struct drbd_request, w);
1458         struct drbd_device *device = req->device;
1459         struct drbd_peer_device *const peer_device = first_peer_device(device);
1460         struct drbd_connection *connection = peer_device->connection;
1461         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462         int err;
1463
1464         if (unlikely(cancel)) {
1465                 req_mod(req, SEND_CANCELED, peer_device);
1466                 return 0;
1467         }
1468         req->pre_send_jif = jiffies;
1469
1470         re_init_if_first_write(connection, req->epoch);
1471         maybe_send_barrier(connection, req->epoch);
1472         connection->send.current_epoch_writes++;
1473
1474         err = drbd_send_dblock(peer_device, req);
1475         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1476
1477         if (do_send_unplug && !err)
1478                 pd_send_unplug_remote(peer_device);
1479
1480         return err;
1481 }
1482
1483 /**
1484  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485  * @w:          work object.
1486  * @cancel:     The connection will be closed anyways
1487  */
1488 int w_send_read_req(struct drbd_work *w, int cancel)
1489 {
1490         struct drbd_request *req = container_of(w, struct drbd_request, w);
1491         struct drbd_device *device = req->device;
1492         struct drbd_peer_device *const peer_device = first_peer_device(device);
1493         struct drbd_connection *connection = peer_device->connection;
1494         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495         int err;
1496
1497         if (unlikely(cancel)) {
1498                 req_mod(req, SEND_CANCELED, peer_device);
1499                 return 0;
1500         }
1501         req->pre_send_jif = jiffies;
1502
1503         /* Even read requests may close a write epoch,
1504          * if there was any yet. */
1505         maybe_send_barrier(connection, req->epoch);
1506
1507         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508                                  (unsigned long)req);
1509
1510         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK, peer_device);
1511
1512         if (do_send_unplug && !err)
1513                 pd_send_unplug_remote(peer_device);
1514
1515         return err;
1516 }
1517
1518 int w_restart_disk_io(struct drbd_work *w, int cancel)
1519 {
1520         struct drbd_request *req = container_of(w, struct drbd_request, w);
1521         struct drbd_device *device = req->device;
1522
1523         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524                 drbd_al_begin_io(device, &req->i);
1525
1526         req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1527                                            req->master_bio, GFP_NOIO,
1528                                           &drbd_io_bio_set);
1529         req->private_bio->bi_private = req;
1530         req->private_bio->bi_end_io = drbd_request_endio;
1531         submit_bio_noacct(req->private_bio);
1532
1533         return 0;
1534 }
1535
1536 static int _drbd_may_sync_now(struct drbd_device *device)
1537 {
1538         struct drbd_device *odev = device;
1539         int resync_after;
1540
1541         while (1) {
1542                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1543                         return 1;
1544                 rcu_read_lock();
1545                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1546                 rcu_read_unlock();
1547                 if (resync_after == -1)
1548                         return 1;
1549                 odev = minor_to_device(resync_after);
1550                 if (!odev)
1551                         return 1;
1552                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1553                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1554                     odev->state.aftr_isp || odev->state.peer_isp ||
1555                     odev->state.user_isp)
1556                         return 0;
1557         }
1558 }
1559
1560 /**
1561  * drbd_pause_after() - Pause resync on all devices that may not resync now
1562  * @device:     DRBD device.
1563  *
1564  * Called from process context only (admin command and after_state_ch).
1565  */
1566 static bool drbd_pause_after(struct drbd_device *device)
1567 {
1568         bool changed = false;
1569         struct drbd_device *odev;
1570         int i;
1571
1572         rcu_read_lock();
1573         idr_for_each_entry(&drbd_devices, odev, i) {
1574                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1575                         continue;
1576                 if (!_drbd_may_sync_now(odev) &&
1577                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1578                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1579                         changed = true;
1580         }
1581         rcu_read_unlock();
1582
1583         return changed;
1584 }
1585
1586 /**
1587  * drbd_resume_next() - Resume resync on all devices that may resync now
1588  * @device:     DRBD device.
1589  *
1590  * Called from process context only (admin command and worker).
1591  */
1592 static bool drbd_resume_next(struct drbd_device *device)
1593 {
1594         bool changed = false;
1595         struct drbd_device *odev;
1596         int i;
1597
1598         rcu_read_lock();
1599         idr_for_each_entry(&drbd_devices, odev, i) {
1600                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1601                         continue;
1602                 if (odev->state.aftr_isp) {
1603                         if (_drbd_may_sync_now(odev) &&
1604                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1605                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1606                                 changed = true;
1607                 }
1608         }
1609         rcu_read_unlock();
1610         return changed;
1611 }
1612
1613 void resume_next_sg(struct drbd_device *device)
1614 {
1615         lock_all_resources();
1616         drbd_resume_next(device);
1617         unlock_all_resources();
1618 }
1619
1620 void suspend_other_sg(struct drbd_device *device)
1621 {
1622         lock_all_resources();
1623         drbd_pause_after(device);
1624         unlock_all_resources();
1625 }
1626
1627 /* caller must lock_all_resources() */
1628 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1629 {
1630         struct drbd_device *odev;
1631         int resync_after;
1632
1633         if (o_minor == -1)
1634                 return NO_ERROR;
1635         if (o_minor < -1 || o_minor > MINORMASK)
1636                 return ERR_RESYNC_AFTER;
1637
1638         /* check for loops */
1639         odev = minor_to_device(o_minor);
1640         while (1) {
1641                 if (odev == device)
1642                         return ERR_RESYNC_AFTER_CYCLE;
1643
1644                 /* You are free to depend on diskless, non-existing,
1645                  * or not yet/no longer existing minors.
1646                  * We only reject dependency loops.
1647                  * We cannot follow the dependency chain beyond a detached or
1648                  * missing minor.
1649                  */
1650                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1651                         return NO_ERROR;
1652
1653                 rcu_read_lock();
1654                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1655                 rcu_read_unlock();
1656                 /* dependency chain ends here, no cycles. */
1657                 if (resync_after == -1)
1658                         return NO_ERROR;
1659
1660                 /* follow the dependency chain */
1661                 odev = minor_to_device(resync_after);
1662         }
1663 }
1664
1665 /* caller must lock_all_resources() */
1666 void drbd_resync_after_changed(struct drbd_device *device)
1667 {
1668         int changed;
1669
1670         do {
1671                 changed  = drbd_pause_after(device);
1672                 changed |= drbd_resume_next(device);
1673         } while (changed);
1674 }
1675
1676 void drbd_rs_controller_reset(struct drbd_peer_device *peer_device)
1677 {
1678         struct drbd_device *device = peer_device->device;
1679         struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1680         struct fifo_buffer *plan;
1681
1682         atomic_set(&device->rs_sect_in, 0);
1683         atomic_set(&device->rs_sect_ev, 0);
1684         device->rs_in_flight = 0;
1685         device->rs_last_events =
1686                 (int)part_stat_read_accum(disk->part0, sectors);
1687
1688         /* Updating the RCU protected object in place is necessary since
1689            this function gets called from atomic context.
1690            It is valid since all other updates also lead to an completely
1691            empty fifo */
1692         rcu_read_lock();
1693         plan = rcu_dereference(device->rs_plan_s);
1694         plan->total = 0;
1695         fifo_set(plan, 0);
1696         rcu_read_unlock();
1697 }
1698
1699 void start_resync_timer_fn(struct timer_list *t)
1700 {
1701         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1702         drbd_device_post_work(device, RS_START);
1703 }
1704
1705 static void do_start_resync(struct drbd_device *device)
1706 {
1707         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1708                 drbd_warn(device, "postponing start_resync ...\n");
1709                 device->start_resync_timer.expires = jiffies + HZ/10;
1710                 add_timer(&device->start_resync_timer);
1711                 return;
1712         }
1713
1714         drbd_start_resync(device, C_SYNC_SOURCE);
1715         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1716 }
1717
1718 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1719 {
1720         bool csums_after_crash_only;
1721         rcu_read_lock();
1722         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1723         rcu_read_unlock();
1724         return connection->agreed_pro_version >= 89 &&          /* supported? */
1725                 connection->csums_tfm &&                        /* configured? */
1726                 (csums_after_crash_only == false                /* use for each resync? */
1727                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1728 }
1729
1730 /**
1731  * drbd_start_resync() - Start the resync process
1732  * @device:     DRBD device.
1733  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1734  *
1735  * This function might bring you directly into one of the
1736  * C_PAUSED_SYNC_* states.
1737  */
1738 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1739 {
1740         struct drbd_peer_device *peer_device = first_peer_device(device);
1741         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1742         union drbd_state ns;
1743         int r;
1744
1745         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1746                 drbd_err(device, "Resync already running!\n");
1747                 return;
1748         }
1749
1750         if (!connection) {
1751                 drbd_err(device, "No connection to peer, aborting!\n");
1752                 return;
1753         }
1754
1755         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1756                 if (side == C_SYNC_TARGET) {
1757                         /* Since application IO was locked out during C_WF_BITMAP_T and
1758                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1759                            we check that we might make the data inconsistent. */
1760                         r = drbd_khelper(device, "before-resync-target");
1761                         r = (r >> 8) & 0xff;
1762                         if (r > 0) {
1763                                 drbd_info(device, "before-resync-target handler returned %d, "
1764                                          "dropping connection.\n", r);
1765                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1766                                 return;
1767                         }
1768                 } else /* C_SYNC_SOURCE */ {
1769                         r = drbd_khelper(device, "before-resync-source");
1770                         r = (r >> 8) & 0xff;
1771                         if (r > 0) {
1772                                 if (r == 3) {
1773                                         drbd_info(device, "before-resync-source handler returned %d, "
1774                                                  "ignoring. Old userland tools?", r);
1775                                 } else {
1776                                         drbd_info(device, "before-resync-source handler returned %d, "
1777                                                  "dropping connection.\n", r);
1778                                         conn_request_state(connection,
1779                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1780                                         return;
1781                                 }
1782                         }
1783                 }
1784         }
1785
1786         if (current == connection->worker.task) {
1787                 /* The worker should not sleep waiting for state_mutex,
1788                    that can take long */
1789                 if (!mutex_trylock(device->state_mutex)) {
1790                         set_bit(B_RS_H_DONE, &device->flags);
1791                         device->start_resync_timer.expires = jiffies + HZ/5;
1792                         add_timer(&device->start_resync_timer);
1793                         return;
1794                 }
1795         } else {
1796                 mutex_lock(device->state_mutex);
1797         }
1798
1799         lock_all_resources();
1800         clear_bit(B_RS_H_DONE, &device->flags);
1801         /* Did some connection breakage or IO error race with us? */
1802         if (device->state.conn < C_CONNECTED
1803         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1804                 unlock_all_resources();
1805                 goto out;
1806         }
1807
1808         ns = drbd_read_state(device);
1809
1810         ns.aftr_isp = !_drbd_may_sync_now(device);
1811
1812         ns.conn = side;
1813
1814         if (side == C_SYNC_TARGET)
1815                 ns.disk = D_INCONSISTENT;
1816         else /* side == C_SYNC_SOURCE */
1817                 ns.pdsk = D_INCONSISTENT;
1818
1819         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1820         ns = drbd_read_state(device);
1821
1822         if (ns.conn < C_CONNECTED)
1823                 r = SS_UNKNOWN_ERROR;
1824
1825         if (r == SS_SUCCESS) {
1826                 unsigned long tw = drbd_bm_total_weight(device);
1827                 unsigned long now = jiffies;
1828                 int i;
1829
1830                 device->rs_failed    = 0;
1831                 device->rs_paused    = 0;
1832                 device->rs_same_csum = 0;
1833                 device->rs_last_sect_ev = 0;
1834                 device->rs_total     = tw;
1835                 device->rs_start     = now;
1836                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1837                         device->rs_mark_left[i] = tw;
1838                         device->rs_mark_time[i] = now;
1839                 }
1840                 drbd_pause_after(device);
1841                 /* Forget potentially stale cached per resync extent bit-counts.
1842                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1843                  * disabled, and know the disk state is ok. */
1844                 spin_lock(&device->al_lock);
1845                 lc_reset(device->resync);
1846                 device->resync_locked = 0;
1847                 device->resync_wenr = LC_FREE;
1848                 spin_unlock(&device->al_lock);
1849         }
1850         unlock_all_resources();
1851
1852         if (r == SS_SUCCESS) {
1853                 wake_up(&device->al_wait); /* for lc_reset() above */
1854                 /* reset rs_last_bcast when a resync or verify is started,
1855                  * to deal with potential jiffies wrap. */
1856                 device->rs_last_bcast = jiffies - HZ;
1857
1858                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1859                      drbd_conn_str(ns.conn),
1860                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1861                      (unsigned long) device->rs_total);
1862                 if (side == C_SYNC_TARGET) {
1863                         device->bm_resync_fo = 0;
1864                         device->use_csums = use_checksum_based_resync(connection, device);
1865                 } else {
1866                         device->use_csums = false;
1867                 }
1868
1869                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1870                  * with w_send_oos, or the sync target will get confused as to
1871                  * how much bits to resync.  We cannot do that always, because for an
1872                  * empty resync and protocol < 95, we need to do it here, as we call
1873                  * drbd_resync_finished from here in that case.
1874                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1875                  * and from after_state_ch otherwise. */
1876                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1877                         drbd_gen_and_send_sync_uuid(peer_device);
1878
1879                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1880                         /* This still has a race (about when exactly the peers
1881                          * detect connection loss) that can lead to a full sync
1882                          * on next handshake. In 8.3.9 we fixed this with explicit
1883                          * resync-finished notifications, but the fix
1884                          * introduces a protocol change.  Sleeping for some
1885                          * time longer than the ping interval + timeout on the
1886                          * SyncSource, to give the SyncTarget the chance to
1887                          * detect connection loss, then waiting for a ping
1888                          * response (implicit in drbd_resync_finished) reduces
1889                          * the race considerably, but does not solve it. */
1890                         if (side == C_SYNC_SOURCE) {
1891                                 struct net_conf *nc;
1892                                 int timeo;
1893
1894                                 rcu_read_lock();
1895                                 nc = rcu_dereference(connection->net_conf);
1896                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1897                                 rcu_read_unlock();
1898                                 schedule_timeout_interruptible(timeo);
1899                         }
1900                         drbd_resync_finished(peer_device);
1901                 }
1902
1903                 drbd_rs_controller_reset(peer_device);
1904                 /* ns.conn may already be != device->state.conn,
1905                  * we may have been paused in between, or become paused until
1906                  * the timer triggers.
1907                  * No matter, that is handled in resync_timer_fn() */
1908                 if (ns.conn == C_SYNC_TARGET)
1909                         mod_timer(&device->resync_timer, jiffies);
1910
1911                 drbd_md_sync(device);
1912         }
1913         put_ldev(device);
1914 out:
1915         mutex_unlock(device->state_mutex);
1916 }
1917
1918 static void update_on_disk_bitmap(struct drbd_peer_device *peer_device, bool resync_done)
1919 {
1920         struct drbd_device *device = peer_device->device;
1921         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1922         device->rs_last_bcast = jiffies;
1923
1924         if (!get_ldev(device))
1925                 return;
1926
1927         drbd_bm_write_lazy(device, 0);
1928         if (resync_done && is_sync_state(device->state.conn))
1929                 drbd_resync_finished(peer_device);
1930
1931         drbd_bcast_event(device, &sib);
1932         /* update timestamp, in case it took a while to write out stuff */
1933         device->rs_last_bcast = jiffies;
1934         put_ldev(device);
1935 }
1936
1937 static void drbd_ldev_destroy(struct drbd_device *device)
1938 {
1939         lc_destroy(device->resync);
1940         device->resync = NULL;
1941         lc_destroy(device->act_log);
1942         device->act_log = NULL;
1943
1944         __acquire(local);
1945         drbd_backing_dev_free(device, device->ldev);
1946         device->ldev = NULL;
1947         __release(local);
1948
1949         clear_bit(GOING_DISKLESS, &device->flags);
1950         wake_up(&device->misc_wait);
1951 }
1952
1953 static void go_diskless(struct drbd_device *device)
1954 {
1955         struct drbd_peer_device *peer_device = first_peer_device(device);
1956         D_ASSERT(device, device->state.disk == D_FAILED);
1957         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1958          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1959          * the protected members anymore, though, so once put_ldev reaches zero
1960          * again, it will be safe to free them. */
1961
1962         /* Try to write changed bitmap pages, read errors may have just
1963          * set some bits outside the area covered by the activity log.
1964          *
1965          * If we have an IO error during the bitmap writeout,
1966          * we will want a full sync next time, just in case.
1967          * (Do we want a specific meta data flag for this?)
1968          *
1969          * If that does not make it to stable storage either,
1970          * we cannot do anything about that anymore.
1971          *
1972          * We still need to check if both bitmap and ldev are present, we may
1973          * end up here after a failed attach, before ldev was even assigned.
1974          */
1975         if (device->bitmap && device->ldev) {
1976                 /* An interrupted resync or similar is allowed to recounts bits
1977                  * while we detach.
1978                  * Any modifications would not be expected anymore, though.
1979                  */
1980                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1981                                         "detach", BM_LOCKED_TEST_ALLOWED, peer_device)) {
1982                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1983                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1984                                 drbd_md_sync(device);
1985                         }
1986                 }
1987         }
1988
1989         drbd_force_state(device, NS(disk, D_DISKLESS));
1990 }
1991
1992 static int do_md_sync(struct drbd_device *device)
1993 {
1994         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1995         drbd_md_sync(device);
1996         return 0;
1997 }
1998
1999 /* only called from drbd_worker thread, no locking */
2000 void __update_timing_details(
2001                 struct drbd_thread_timing_details *tdp,
2002                 unsigned int *cb_nr,
2003                 void *cb,
2004                 const char *fn, const unsigned int line)
2005 {
2006         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2007         struct drbd_thread_timing_details *td = tdp + i;
2008
2009         td->start_jif = jiffies;
2010         td->cb_addr = cb;
2011         td->caller_fn = fn;
2012         td->line = line;
2013         td->cb_nr = *cb_nr;
2014
2015         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2016         td = tdp + i;
2017         memset(td, 0, sizeof(*td));
2018
2019         ++(*cb_nr);
2020 }
2021
2022 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2023 {
2024         if (test_bit(MD_SYNC, &todo))
2025                 do_md_sync(device);
2026         if (test_bit(RS_DONE, &todo) ||
2027             test_bit(RS_PROGRESS, &todo))
2028                 update_on_disk_bitmap(first_peer_device(device), test_bit(RS_DONE, &todo));
2029         if (test_bit(GO_DISKLESS, &todo))
2030                 go_diskless(device);
2031         if (test_bit(DESTROY_DISK, &todo))
2032                 drbd_ldev_destroy(device);
2033         if (test_bit(RS_START, &todo))
2034                 do_start_resync(device);
2035 }
2036
2037 #define DRBD_DEVICE_WORK_MASK   \
2038         ((1UL << GO_DISKLESS)   \
2039         |(1UL << DESTROY_DISK)  \
2040         |(1UL << MD_SYNC)       \
2041         |(1UL << RS_START)      \
2042         |(1UL << RS_PROGRESS)   \
2043         |(1UL << RS_DONE)       \
2044         )
2045
2046 static unsigned long get_work_bits(unsigned long *flags)
2047 {
2048         unsigned long old, new;
2049         do {
2050                 old = *flags;
2051                 new = old & ~DRBD_DEVICE_WORK_MASK;
2052         } while (cmpxchg(flags, old, new) != old);
2053         return old & DRBD_DEVICE_WORK_MASK;
2054 }
2055
2056 static void do_unqueued_work(struct drbd_connection *connection)
2057 {
2058         struct drbd_peer_device *peer_device;
2059         int vnr;
2060
2061         rcu_read_lock();
2062         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2063                 struct drbd_device *device = peer_device->device;
2064                 unsigned long todo = get_work_bits(&device->flags);
2065                 if (!todo)
2066                         continue;
2067
2068                 kref_get(&device->kref);
2069                 rcu_read_unlock();
2070                 do_device_work(device, todo);
2071                 kref_put(&device->kref, drbd_destroy_device);
2072                 rcu_read_lock();
2073         }
2074         rcu_read_unlock();
2075 }
2076
2077 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2078 {
2079         spin_lock_irq(&queue->q_lock);
2080         list_splice_tail_init(&queue->q, work_list);
2081         spin_unlock_irq(&queue->q_lock);
2082         return !list_empty(work_list);
2083 }
2084
2085 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2086 {
2087         DEFINE_WAIT(wait);
2088         struct net_conf *nc;
2089         int uncork, cork;
2090
2091         dequeue_work_batch(&connection->sender_work, work_list);
2092         if (!list_empty(work_list))
2093                 return;
2094
2095         /* Still nothing to do?
2096          * Maybe we still need to close the current epoch,
2097          * even if no new requests are queued yet.
2098          *
2099          * Also, poke TCP, just in case.
2100          * Then wait for new work (or signal). */
2101         rcu_read_lock();
2102         nc = rcu_dereference(connection->net_conf);
2103         uncork = nc ? nc->tcp_cork : 0;
2104         rcu_read_unlock();
2105         if (uncork) {
2106                 mutex_lock(&connection->data.mutex);
2107                 if (connection->data.socket)
2108                         tcp_sock_set_cork(connection->data.socket->sk, false);
2109                 mutex_unlock(&connection->data.mutex);
2110         }
2111
2112         for (;;) {
2113                 int send_barrier;
2114                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2115                 spin_lock_irq(&connection->resource->req_lock);
2116                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2117                 if (!list_empty(&connection->sender_work.q))
2118                         list_splice_tail_init(&connection->sender_work.q, work_list);
2119                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2120                 if (!list_empty(work_list) || signal_pending(current)) {
2121                         spin_unlock_irq(&connection->resource->req_lock);
2122                         break;
2123                 }
2124
2125                 /* We found nothing new to do, no to-be-communicated request,
2126                  * no other work item.  We may still need to close the last
2127                  * epoch.  Next incoming request epoch will be connection ->
2128                  * current transfer log epoch number.  If that is different
2129                  * from the epoch of the last request we communicated, it is
2130                  * safe to send the epoch separating barrier now.
2131                  */
2132                 send_barrier =
2133                         atomic_read(&connection->current_tle_nr) !=
2134                         connection->send.current_epoch_nr;
2135                 spin_unlock_irq(&connection->resource->req_lock);
2136
2137                 if (send_barrier)
2138                         maybe_send_barrier(connection,
2139                                         connection->send.current_epoch_nr + 1);
2140
2141                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2142                         break;
2143
2144                 /* drbd_send() may have called flush_signals() */
2145                 if (get_t_state(&connection->worker) != RUNNING)
2146                         break;
2147
2148                 schedule();
2149                 /* may be woken up for other things but new work, too,
2150                  * e.g. if the current epoch got closed.
2151                  * In which case we send the barrier above. */
2152         }
2153         finish_wait(&connection->sender_work.q_wait, &wait);
2154
2155         /* someone may have changed the config while we have been waiting above. */
2156         rcu_read_lock();
2157         nc = rcu_dereference(connection->net_conf);
2158         cork = nc ? nc->tcp_cork : 0;
2159         rcu_read_unlock();
2160         mutex_lock(&connection->data.mutex);
2161         if (connection->data.socket) {
2162                 if (cork)
2163                         tcp_sock_set_cork(connection->data.socket->sk, true);
2164                 else if (!uncork)
2165                         tcp_sock_set_cork(connection->data.socket->sk, false);
2166         }
2167         mutex_unlock(&connection->data.mutex);
2168 }
2169
2170 int drbd_worker(struct drbd_thread *thi)
2171 {
2172         struct drbd_connection *connection = thi->connection;
2173         struct drbd_work *w = NULL;
2174         struct drbd_peer_device *peer_device;
2175         LIST_HEAD(work_list);
2176         int vnr;
2177
2178         while (get_t_state(thi) == RUNNING) {
2179                 drbd_thread_current_set_cpu(thi);
2180
2181                 if (list_empty(&work_list)) {
2182                         update_worker_timing_details(connection, wait_for_work);
2183                         wait_for_work(connection, &work_list);
2184                 }
2185
2186                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2187                         update_worker_timing_details(connection, do_unqueued_work);
2188                         do_unqueued_work(connection);
2189                 }
2190
2191                 if (signal_pending(current)) {
2192                         flush_signals(current);
2193                         if (get_t_state(thi) == RUNNING) {
2194                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2195                                 continue;
2196                         }
2197                         break;
2198                 }
2199
2200                 if (get_t_state(thi) != RUNNING)
2201                         break;
2202
2203                 if (!list_empty(&work_list)) {
2204                         w = list_first_entry(&work_list, struct drbd_work, list);
2205                         list_del_init(&w->list);
2206                         update_worker_timing_details(connection, w->cb);
2207                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2208                                 continue;
2209                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2210                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2211                 }
2212         }
2213
2214         do {
2215                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2216                         update_worker_timing_details(connection, do_unqueued_work);
2217                         do_unqueued_work(connection);
2218                 }
2219                 if (!list_empty(&work_list)) {
2220                         w = list_first_entry(&work_list, struct drbd_work, list);
2221                         list_del_init(&w->list);
2222                         update_worker_timing_details(connection, w->cb);
2223                         w->cb(w, 1);
2224                 } else
2225                         dequeue_work_batch(&connection->sender_work, &work_list);
2226         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2227
2228         rcu_read_lock();
2229         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2230                 struct drbd_device *device = peer_device->device;
2231                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2232                 kref_get(&device->kref);
2233                 rcu_read_unlock();
2234                 drbd_device_cleanup(device);
2235                 kref_put(&device->kref, drbd_destroy_device);
2236                 rcu_read_lock();
2237         }
2238         rcu_read_unlock();
2239
2240         return 0;
2241 }