Line data Source code
1 : // SPDX-License-Identifier: GPL-3.0-or-later
2 : // Copyright (C) 2016 Andrea Mazzoleni
3 :
4 : #include "portable.h"
5 :
6 : #include "io.h"
7 :
8 : void (*io_start)(struct snapraid_io* io,
9 : block_off_t blockstart, block_off_t blockmax,
10 : bit_vect_t* block_enabled) = 0;
11 : void (*io_stop)(struct snapraid_io* io) = 0;
12 : block_off_t (*io_read_next)(struct snapraid_io* io, void*** buffer) = 0;
13 : struct snapraid_task* (*io_data_read)(struct snapraid_io* io, unsigned* diskcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
14 : struct snapraid_task* (*io_parity_read)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
15 : void (*io_parity_write)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
16 : void (*io_write_preset)(struct snapraid_io* io, block_off_t blockcur, int skip) = 0;
17 : void (*io_write_next)(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error) = 0;
18 : void (*io_refresh)(struct snapraid_io* io) = 0;
19 : void (*io_flush)(struct snapraid_io* io) = 0;
20 :
21 : /**
22 : * Get the next block position to process.
23 : */
24 182525 : static block_off_t io_position_next(struct snapraid_io* io)
25 : {
26 : block_off_t blockcur;
27 :
28 : /* get the next position */
29 182525 : if (io->block_enabled) {
30 541891 : while (io->block_next < io->block_max && !bit_vect_test(io->block_enabled, io->block_next))
31 369267 : ++io->block_next;
32 : }
33 :
34 182525 : blockcur = io->block_next;
35 :
36 : /* next block for the next call */
37 182525 : ++io->block_next;
38 :
39 182525 : return blockcur;
40 : }
41 :
42 : /**
43 : * Setup the next pending task for all readers.
44 : */
45 182525 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
46 : {
47 : unsigned i;
48 :
49 1477830 : for (i = 0; i < io->reader_max; ++i) {
50 1295305 : struct snapraid_worker* worker = &io->reader_map[i];
51 1295305 : struct snapraid_task* task = &worker->task_map[task_index];
52 :
53 : /* setup the new pending task */
54 1295305 : if (blockcur < io->block_max)
55 1201085 : task->state = TASK_STATE_READY;
56 : else
57 94220 : task->state = TASK_STATE_EMPTY;
58 :
59 1295305 : task->path[0] = 0;
60 1295305 : if (worker->handle)
61 1071591 : task->disk = worker->handle->disk;
62 : else
63 223714 : task->disk = 0;
64 :
65 1295305 : assert(worker->buffer_skew + i < io->buffer_max);
66 :
67 1295305 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
68 1295305 : task->position = blockcur;
69 1295305 : task->block = 0;
70 1295305 : task->file = 0;
71 1295305 : task->file_pos = 0;
72 1295305 : task->read_size = 0;
73 1295305 : task->is_timestamp_different = 0;
74 : }
75 182525 : }
76 :
77 : /**
78 : * Setup the next pending task for all writers.
79 : */
80 131776 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
81 : {
82 : unsigned i;
83 :
84 716497 : for (i = 0; i < io->writer_max; ++i) {
85 584721 : struct snapraid_worker* worker = &io->writer_map[i];
86 584721 : struct snapraid_task* task = &worker->task_map[task_index];
87 :
88 : /* setup the new pending task */
89 584721 : task->state = TASK_STATE_READY;
90 584721 : task->path[0] = 0;
91 584721 : task->disk = 0;
92 584721 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
93 584721 : task->position = blockcur;
94 584721 : task->block = 0;
95 584721 : task->file = 0;
96 584721 : task->file_pos = 0;
97 584721 : task->read_size = 0;
98 584721 : task->is_timestamp_different = 0;
99 : }
100 131776 : }
101 :
102 : /**
103 : * Setup an empty next pending task for all writers.
104 : */
105 19 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
106 : {
107 : unsigned i;
108 :
109 133 : for (i = 0; i < io->writer_max; ++i) {
110 114 : struct snapraid_worker* worker = &io->writer_map[i];
111 114 : struct snapraid_task* task = &worker->task_map[task_index];
112 :
113 : /* setup the new pending task */
114 114 : task->state = TASK_STATE_EMPTY;
115 114 : task->path[0] = 0;
116 114 : task->disk = 0;
117 114 : task->buffer = 0;
118 114 : task->position = blockcur;
119 114 : task->block = 0;
120 114 : task->file = 0;
121 114 : task->file_pos = 0;
122 114 : task->read_size = 0;
123 114 : task->is_timestamp_different = 0;
124 : }
125 19 : }
126 :
127 : /*****************************************************************************/
128 : /* mono thread */
129 :
130 9376 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
131 : {
132 : block_off_t blockcur_schedule;
133 :
134 : /* reset the index */
135 9376 : io->reader_index = 0;
136 :
137 9376 : blockcur_schedule = io_position_next(io);
138 :
139 : /* schedule the next read */
140 9376 : io_reader_sched(io, 0, blockcur_schedule);
141 :
142 : /* set the buffer to use */
143 9376 : *buffer = io->buffer_map[0];
144 :
145 9376 : return blockcur_schedule;
146 : }
147 :
148 9374 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
149 : {
150 : unsigned i;
151 :
152 : /* reset the index */
153 9374 : io->writer_index = 0;
154 :
155 : /* clear errors */
156 46870 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
157 37496 : io->writer_error[i] = 0;
158 :
159 9374 : if (skip) {
160 : /* skip the next write */
161 0 : io_writer_sched_empty(io, 0, blockcur);
162 : } else {
163 : /* schedule the next write */
164 9374 : io_writer_sched(io, 0, blockcur);
165 : }
166 9374 : }
167 :
168 9374 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
169 : {
170 : unsigned i;
171 :
172 : (void)blockcur;
173 : (void)skip;
174 :
175 : /* report errors */
176 46870 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
177 37496 : writer_error[i] = io->writer_error[i];
178 9374 : }
179 :
180 0 : static void io_refresh_mono(struct snapraid_io* io)
181 : {
182 : (void)io;
183 0 : }
184 :
185 2 : static void io_flush_mono(struct snapraid_io* io)
186 : {
187 : (void)io;
188 2 : }
189 :
190 56244 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
191 : {
192 : struct snapraid_worker* worker;
193 : struct snapraid_task* task;
194 : unsigned i;
195 :
196 : /* get the next task */
197 56244 : i = io->reader_index++;
198 :
199 56244 : assert(base <= i && i < base + count);
200 :
201 56244 : worker = &io->reader_map[i];
202 56244 : task = &worker->task_map[0];
203 :
204 : /* do the work */
205 56244 : if (task->state != TASK_STATE_EMPTY)
206 56244 : worker->func(worker, task);
207 :
208 : /* return the position */
209 56244 : *pos = i - base;
210 :
211 : /* store the waiting index */
212 56244 : waiting_map[0] = i - base;
213 56244 : *waiting_mac = 1;
214 :
215 56244 : return task;
216 : }
217 :
218 56244 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
219 : {
220 56244 : return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
221 : }
222 :
223 0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
224 : {
225 0 : return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
226 : }
227 :
228 9374 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
229 : {
230 : struct snapraid_worker* worker;
231 : struct snapraid_task* task;
232 : unsigned i;
233 :
234 : /* get the next task */
235 9374 : i = io->writer_index++;
236 :
237 9374 : worker = &io->writer_map[i];
238 9374 : task = &worker->task_map[0];
239 :
240 9374 : io->writer_error[i] = 0;
241 :
242 : /* do the work */
243 9374 : if (task->state != TASK_STATE_EMPTY)
244 9374 : worker->func(worker, task);
245 :
246 : /* return the position */
247 9374 : *pos = i;
248 :
249 : /* store the waiting index */
250 9374 : waiting_map[0] = i;
251 9374 : *waiting_mac = 1;
252 9374 : }
253 :
254 2 : static void io_start_mono(struct snapraid_io* io,
255 : block_off_t blockstart, block_off_t blockmax,
256 : bit_vect_t* block_enabled)
257 : {
258 2 : io->block_start = blockstart;
259 2 : io->block_max = blockmax;
260 2 : io->block_enabled = block_enabled;
261 2 : io->block_next = blockstart;
262 2 : }
263 :
264 2 : static void io_stop_mono(struct snapraid_io* io)
265 : {
266 : (void)io;
267 2 : }
268 :
269 : /*****************************************************************************/
270 : /* multi thread */
271 :
272 : /* disable multithread if pthread is not present */
273 : #if HAVE_THREAD
274 :
275 : /**
276 : * Get the next task to work on for a reader.
277 : *
278 : * This is the synchronization point for workers with the io.
279 : */
280 1238848 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
281 : {
282 1238848 : struct snapraid_io* io = worker->io;
283 :
284 : /* the synchronization is protected by the io mutex */
285 1238848 : thread_mutex_lock(&io->io_mutex);
286 :
287 1031756 : while (1) {
288 : unsigned next_index;
289 :
290 : /*
291 : * Check if the worker has to exit
292 : * even if there is work to do
293 : */
294 2270604 : if (io->done) {
295 670 : thread_mutex_unlock(&io->io_mutex);
296 670 : return 0;
297 : }
298 :
299 : /* get the next pending task */
300 2269934 : next_index = (worker->index + 1) % io->io_max;
301 :
302 : /* if the queue of pending tasks is not empty */
303 2269934 : if (next_index != io->reader_index) {
304 : struct snapraid_task* task;
305 :
306 : /* the index that the IO may be waiting for */
307 1238112 : unsigned waiting_index = io->reader_index;
308 :
309 : /* the index that worker just completed */
310 1238112 : unsigned done_index = worker->index;
311 :
312 : /* get the new working task */
313 1238112 : worker->index = next_index;
314 1238112 : task = &worker->task_map[worker->index];
315 :
316 : /* if the just completed task is at this index */
317 1238112 : if (done_index == waiting_index) {
318 : /* notify the IO that a new read is complete */
319 1279 : thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
320 : } else {
321 1236833 : thread_mutex_unlock(&io->io_mutex);
322 : }
323 :
324 : /* return the new task */
325 1238112 : return task;
326 : }
327 :
328 : /* otherwise wait for a read_sched event */
329 1031822 : thread_cond_wait(&io->read_sched, &io->io_mutex);
330 : }
331 : }
332 :
333 : /**
334 : * Get the next task to work on for a writer.
335 : *
336 : * This is the synchronization point for workers with the io.
337 : */
338 575974 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
339 : {
340 575974 : struct snapraid_io* io = worker->io;
341 : int error_index;
342 :
343 : /* the synchronization is protected by the io mutex */
344 575974 : thread_mutex_lock(&io->io_mutex);
345 :
346 : /* counts the number of errors in the global state */
347 575974 : error_index = state - IO_WRITER_ERROR_BASE;
348 575974 : if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
349 0 : ++io->writer_error[error_index];
350 :
351 558679 : while (1) {
352 : unsigned next_index;
353 :
354 : /* get the next pending task */
355 1134653 : next_index = (worker->index + 1) % io->io_max;
356 :
357 : /* if the queue of pending tasks is not empty */
358 1134653 : if (next_index != io->writer_index) {
359 : struct snapraid_task* task;
360 :
361 : /* the index that the IO may be waiting for */
362 575461 : unsigned waiting_index = (io->writer_index + 1) % io->io_max;
363 :
364 : /* the index that worker just completed */
365 575461 : unsigned done_index = worker->index;
366 :
367 : /* get the new working task */
368 575461 : worker->index = next_index;
369 575461 : task = &worker->task_map[worker->index];
370 :
371 : /* if the just completed task is at this index */
372 575461 : if (done_index == waiting_index) {
373 : /* notify the IO that a new write is complete */
374 4426 : thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
375 : } else {
376 571035 : thread_mutex_unlock(&io->io_mutex);
377 : }
378 :
379 : /* return the new task */
380 575461 : return task;
381 : }
382 :
383 : /*
384 : * Check if the worker has to exit
385 : * but only if there is no work to do
386 : */
387 559192 : if (io->done) {
388 456 : thread_mutex_unlock(&io->io_mutex);
389 456 : return 0;
390 : }
391 :
392 : /* otherwise wait for a write_sched event */
393 558736 : thread_cond_wait(&io->write_sched, &io->io_mutex);
394 : }
395 : }
396 :
397 : /**
398 : * Get the next block position to operate on.
399 : *
400 : * This is the synchronization point for workers with the io.
401 : */
402 158798 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
403 : {
404 : block_off_t blockcur_schedule;
405 : block_off_t blockcur_caller;
406 : unsigned i;
407 :
408 : /* get the next parity position to process */
409 158798 : blockcur_schedule = io_position_next(io);
410 :
411 : /* ensure that all data/parity was read */
412 158798 : assert(io->reader_list[0] == io->reader_max);
413 :
414 : /* setup the list of workers to process */
415 1463173 : for (i = 0; i <= io->reader_max; ++i)
416 1304375 : io->reader_list[i] = i;
417 :
418 : /* the synchronization is protected by the io mutex */
419 158798 : thread_mutex_lock(&io->io_mutex);
420 :
421 : /* schedule the next read */
422 158798 : io_reader_sched(io, io->reader_index, blockcur_schedule);
423 :
424 : /* set the index for the tasks to return to the caller */
425 158798 : io->reader_index = (io->reader_index + 1) % io->io_max;
426 :
427 : /* get the position to operate at high level from one task */
428 158798 : blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
429 :
430 : /* set the buffer to use */
431 158798 : *buffer = io->buffer_map[io->reader_index];
432 :
433 : /* signal all the workers that there is a new pending task */
434 158798 : thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
435 :
436 158798 : return blockcur_caller;
437 : }
438 :
439 122421 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
440 : {
441 : (void)io;
442 : (void)blockcur;
443 : (void)skip;
444 122421 : }
445 :
446 122421 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
447 : {
448 : unsigned i;
449 :
450 : /* ensure that all parity was written */
451 122421 : assert(io->writer_list[0] == io->writer_max);
452 :
453 : /* setup the list of workers to process */
454 820303 : for (i = 0; i <= io->writer_max; ++i)
455 697882 : io->writer_list[i] = i;
456 :
457 : /* the synchronization is protected by the io mutex */
458 122421 : thread_mutex_lock(&io->io_mutex);
459 :
460 : /* report errors */
461 612105 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
462 489684 : writer_error[i] = io->writer_error[i];
463 489684 : io->writer_error[i] = 0;
464 : }
465 :
466 122421 : if (skip) {
467 : /* skip the next write */
468 19 : io_writer_sched_empty(io, io->writer_index, blockcur);
469 : } else {
470 : /* schedule the next write */
471 122402 : io_writer_sched(io, io->writer_index, blockcur);
472 : }
473 :
474 : /* at this point the writers must be in sync with the readers */
475 122421 : assert(io->writer_index == io->reader_index);
476 :
477 : /* set the index to be used for the next write */
478 122421 : io->writer_index = (io->writer_index + 1) % io->io_max;
479 :
480 : /* signal all the workers that there is a new pending task */
481 122421 : thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
482 122421 : }
483 :
484 0 : static void io_refresh_thread(struct snapraid_io* io)
485 : {
486 : unsigned i;
487 :
488 : /* the synchronization is protected by the io mutex */
489 0 : thread_mutex_lock(&io->io_mutex);
490 :
491 : /* for all readers, count the number of read blocks */
492 0 : for (i = 0; i < io->reader_max; ++i) {
493 : unsigned begin, end, cached;
494 0 : struct snapraid_worker* worker = &io->reader_map[i];
495 :
496 : /* the first block read */
497 0 : begin = io->reader_index + 1;
498 : /* the block in reading */
499 0 : end = worker->index;
500 0 : if (begin > end)
501 0 : end += io->io_max;
502 0 : cached = end - begin;
503 :
504 0 : if (worker->parity_handle)
505 0 : io->state->parity[worker->parity_handle->level].cached_blocks = cached;
506 : else
507 0 : worker->handle->disk->cached_blocks = cached;
508 : }
509 :
510 : /*
511 : * For all writers, count the number of written blocks
512 : * note that this is a kind of "opposite" of cached blocks
513 : */
514 0 : for (i = 0; i < io->writer_max; ++i) {
515 : unsigned begin, end, cached;
516 0 : struct snapraid_worker* worker = &io->writer_map[i];
517 :
518 : /* the first block written */
519 0 : begin = io->writer_index + 1;
520 : /* the block in writing */
521 0 : end = worker->index;
522 0 : if (begin > end)
523 0 : end += io->io_max;
524 0 : cached = end - begin;
525 :
526 0 : io->state->parity[worker->parity_handle->level].cached_blocks = cached;
527 : }
528 :
529 0 : thread_mutex_unlock(&io->io_mutex);
530 0 : }
531 :
532 116305 : static void io_flush_thread(struct snapraid_io* io)
533 : {
534 : unsigned i;
535 :
536 116206 : while (1) {
537 116305 : int all_done = 1;
538 :
539 : /* the synchronization is protected by the io mutex */
540 116305 : thread_mutex_lock(&io->io_mutex);
541 :
542 200917 : for (i = 0; i < io->writer_max; ++i) {
543 200818 : struct snapraid_worker* worker = &io->writer_map[i];
544 :
545 : /* get the next pending task */
546 200818 : unsigned next_index = (worker->index + 1) % io->io_max;
547 :
548 : /* if the queue of pending tasks is not empty */
549 200818 : if (next_index != io->writer_index) {
550 116206 : all_done = 0;
551 116206 : break;
552 : }
553 : }
554 :
555 116305 : thread_mutex_unlock(&io->io_mutex);
556 :
557 116305 : if (all_done)
558 99 : break;
559 :
560 : /* wait for something to complete */
561 116206 : thread_yield();
562 : }
563 99 : }
564 :
565 1144841 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
566 : {
567 : unsigned waiting_cycle;
568 :
569 : /* count the waiting cycle */
570 1144841 : waiting_cycle = 0;
571 :
572 : /* clear the waiting indexes */
573 1144841 : *waiting_mac = 0;
574 :
575 : /* the synchronization is protected by the io mutex */
576 1144841 : thread_mutex_lock(&io->io_mutex);
577 :
578 1147 : while (1) {
579 : unsigned char* let;
580 : unsigned busy_index;
581 :
582 : /*
583 : * Get the index the IO is using
584 : * we must ensure that this index has not a read in progress
585 : * to avoid a concurrent access
586 : */
587 1145988 : busy_index = io->reader_index;
588 :
589 : /* search for a worker that has already finished */
590 1145988 : let = &io->reader_list[0];
591 7048 : while (1) {
592 1153036 : unsigned i = *let;
593 :
594 : /* if we are at the end */
595 1153036 : if (i == io->reader_max)
596 1147 : break;
597 :
598 : /* if it's in range */
599 1151889 : if (base <= i && i < base + count) {
600 : struct snapraid_worker* worker;
601 :
602 : /* if it's the first cycle */
603 1148282 : if (waiting_cycle == 0) {
604 : /* store the waiting indexes */
605 1147057 : waiting_map[(*waiting_mac)++] = i - base;
606 : }
607 :
608 1148282 : worker = &io->reader_map[i];
609 :
610 : /* if the worker has finished this index */
611 1148282 : if (busy_index != worker->index) {
612 : struct snapraid_task* task;
613 :
614 1144841 : task = &worker->task_map[io->reader_index];
615 :
616 1144841 : thread_mutex_unlock(&io->io_mutex);
617 :
618 : /*
619 : * Mark the worker as processed
620 : * setting the previous one to point at the next one
621 : */
622 1144841 : *let = io->reader_list[i + 1];
623 :
624 : /* return the position */
625 1144841 : *pos = i - base;
626 :
627 : /* on the first cycle, no one is waiting */
628 1144841 : if (waiting_cycle == 0)
629 1143696 : *waiting_mac = 0;
630 :
631 1144841 : return task;
632 : }
633 : }
634 :
635 : /* next position to check */
636 7048 : let = &io->reader_list[i + 1];
637 : }
638 :
639 : /* if no worker is ready, wait for an event */
640 1147 : thread_cond_wait(&io->read_done, &io->io_mutex);
641 :
642 : /* count the cycles */
643 1147 : ++waiting_cycle;
644 : }
645 : }
646 :
647 932007 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
648 : {
649 932007 : return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
650 : }
651 :
652 212834 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
653 : {
654 212834 : return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
655 : }
656 :
657 575461 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
658 : {
659 : unsigned waiting_cycle;
660 :
661 : /* count the waiting cycle */
662 575461 : waiting_cycle = 0;
663 :
664 : /* clear the waiting indexes */
665 575461 : *waiting_mac = 0;
666 :
667 : /* the synchronization is protected by the io mutex */
668 575461 : thread_mutex_lock(&io->io_mutex);
669 :
670 4409 : while (1) {
671 : unsigned char* let;
672 : unsigned busy_index;
673 :
674 : /*
675 : * Get the next index the IO is going to use
676 : * we must ensure that this index has not a write in progress
677 : * to avoid a concurrent access
678 : * note that we are already sure that a write is not in progress
679 : * at the index the IO is using at now
680 : */
681 579870 : busy_index = (io->writer_index + 1) % io->io_max;
682 :
683 : /* search for a worker that has already finished */
684 579870 : let = &io->writer_list[0];
685 4409 : while (1) {
686 584279 : unsigned i = *let;
687 : struct snapraid_worker* worker;
688 :
689 : /* if we are at the end */
690 584279 : if (i == io->writer_max)
691 4409 : break;
692 :
693 : /* if it's the first cycle */
694 579870 : if (waiting_cycle == 0) {
695 : /* store the waiting indexes */
696 575461 : waiting_map[(*waiting_mac)++] = i;
697 : }
698 :
699 579870 : worker = &io->writer_map[i];
700 :
701 : /* the two indexes cannot be equal */
702 579870 : assert(io->writer_index != worker->index);
703 :
704 : /* if the worker has finished this index */
705 579870 : if (busy_index != worker->index) {
706 575461 : thread_mutex_unlock(&io->io_mutex);
707 :
708 : /*
709 : * Mark the worker as processed
710 : * setting the previous one to point at the next one
711 : */
712 575461 : *let = io->writer_list[i + 1];
713 :
714 : /* return the position */
715 575461 : *pos = i;
716 :
717 : /* on the first cycle, no one is waiting */
718 575461 : if (waiting_cycle == 0)
719 571052 : *waiting_mac = 0;
720 :
721 575461 : return;
722 : }
723 :
724 : /* next position to check */
725 4409 : let = &io->writer_list[i + 1];
726 : }
727 :
728 : /* if no worker is ready, wait for an event */
729 4409 : thread_cond_wait(&io->write_done, &io->io_mutex);
730 :
731 : /* count the cycles */
732 4409 : ++waiting_cycle;
733 : }
734 : }
735 :
736 1144979 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
737 : {
738 : /* if we reached the end */
739 1144979 : if (task->position >= worker->io->block_max) {
740 : /* complete a dummy task */
741 138 : task->state = TASK_STATE_EMPTY;
742 : } else {
743 1144841 : worker->func(worker, task);
744 : }
745 1144979 : }
746 :
747 736 : static void* io_reader_thread(void* arg)
748 : {
749 736 : struct snapraid_worker* worker = arg;
750 :
751 : /* force completion of the first task */
752 736 : io_reader_worker(worker, &worker->task_map[0]);
753 :
754 1238112 : while (1) {
755 : struct snapraid_task* task;
756 :
757 : /* get the new task */
758 1238848 : task = io_reader_step(worker);
759 :
760 : /* if no task, it means to exit */
761 1238782 : if (!task)
762 670 : break;
763 :
764 : /* nothing more to do */
765 1238112 : if (task->state == TASK_STATE_EMPTY)
766 93869 : continue;
767 :
768 1144243 : assert(task->state == TASK_STATE_READY);
769 :
770 : /* work on the assigned task */
771 1144243 : io_reader_worker(worker, task);
772 : }
773 :
774 670 : return 0;
775 : }
776 :
777 513 : static void* io_writer_thread(void* arg)
778 : {
779 513 : struct snapraid_worker* worker = arg;
780 513 : int latest_state = TASK_STATE_DONE;
781 :
782 575461 : while (1) {
783 : struct snapraid_task* task;
784 :
785 : /* get the new task */
786 575974 : task = io_writer_step(worker, latest_state);
787 :
788 : /* if no task, it means to exit */
789 575917 : if (!task)
790 456 : break;
791 :
792 : /* nothing more to do */
793 575461 : if (task->state == TASK_STATE_EMPTY) {
794 114 : latest_state = TASK_STATE_DONE;
795 114 : continue;
796 : }
797 :
798 575347 : assert(task->state == TASK_STATE_READY);
799 :
800 : /* work on the assigned task */
801 575347 : worker->func(worker, task);
802 :
803 : /* save the resulting state */
804 575347 : latest_state = task->state;
805 : }
806 :
807 456 : return 0;
808 : }
809 :
810 113 : static void io_start_thread(struct snapraid_io* io,
811 : block_off_t blockstart, block_off_t blockmax,
812 : bit_vect_t* block_enabled)
813 : {
814 : unsigned i;
815 :
816 113 : io->block_start = blockstart;
817 113 : io->block_max = blockmax;
818 113 : io->block_enabled = block_enabled;
819 113 : io->block_next = blockstart;
820 :
821 113 : io->done = 0;
822 113 : io->reader_index = io->io_max - 1; /* the first io_read_next() is going to set it to 0 */
823 113 : io->writer_index = 0;
824 :
825 : /* clear writer errors */
826 565 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
827 452 : io->writer_error[i] = 0;
828 :
829 : /*
830 : * Setup the initial read pending tasks, except the latest one,
831 : * the latest will be initialized at the fist io_read_next() call
832 : */
833 14464 : for (i = 0; i < io->io_max - 1; ++i) {
834 14351 : block_off_t blockcur = io_position_next(io);
835 :
836 14351 : io_reader_sched(io, i, blockcur);
837 : }
838 :
839 : /* setup the lists of workers to process */
840 113 : io->reader_list[0] = io->reader_max;
841 739 : for (i = 0; i <= io->writer_max; ++i)
842 626 : io->writer_list[i] = i;
843 :
844 : /* start the reader threads */
845 849 : for (i = 0; i < io->reader_max; ++i) {
846 736 : struct snapraid_worker* worker = &io->reader_map[i];
847 :
848 736 : worker->index = 0;
849 :
850 736 : thread_create(&worker->thread, io_reader_thread, worker);
851 : }
852 :
853 : /* start the writer threads */
854 626 : for (i = 0; i < io->writer_max; ++i) {
855 513 : struct snapraid_worker* worker = &io->writer_map[i];
856 :
857 513 : worker->index = io->io_max - 1;
858 :
859 513 : thread_create(&worker->thread, io_writer_thread, worker);
860 : }
861 113 : }
862 :
863 101 : static void io_stop_thread(struct snapraid_io* io)
864 : {
865 : unsigned i;
866 :
867 101 : thread_mutex_lock(&io->io_mutex);
868 :
869 : /* mark that we are stopping */
870 101 : io->done = 1;
871 :
872 : /* signal all the threads to recognize the new state */
873 101 : thread_cond_broadcast(&io->read_sched);
874 101 : thread_cond_broadcast(&io->write_sched);
875 :
876 101 : thread_mutex_unlock(&io->io_mutex);
877 :
878 : /* wait for all readers to terminate */
879 771 : for (i = 0; i < io->reader_max; ++i) {
880 670 : struct snapraid_worker* worker = &io->reader_map[i];
881 : void* retval;
882 :
883 : /* wait for thread termination */
884 670 : thread_join(worker->thread, &retval);
885 : }
886 :
887 : /* wait for all writers to terminate */
888 557 : for (i = 0; i < io->writer_max; ++i) {
889 456 : struct snapraid_worker* worker = &io->writer_map[i];
890 : void* retval;
891 :
892 : /* wait for thread termination */
893 456 : thread_join(worker->thread, &retval);
894 : }
895 101 : }
896 :
897 : #endif
898 :
899 : /*****************************************************************************/
900 : /* global */
901 :
902 115 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
903 : unsigned io_cache, unsigned buffer_max,
904 : void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
905 : struct snapraid_handle* handle_map, unsigned handle_max,
906 : void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
907 : void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
908 : struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
909 : {
910 : unsigned i;
911 : size_t allocated_size;
912 115 : size_t block_size = state->block_size;
913 :
914 115 : io->state = state;
915 :
916 115 : assert(buffer_max >= handle_max + parity_handle_max);
917 :
918 : /* initialize bandwidth limiting */
919 115 : bw_init(&io->bw, state->opt.bwlimit);
920 :
921 : /* set IO context in handles */
922 778 : for (i = 0; i < handle_max; ++i)
923 663 : handle_map[i].bw = &io->bw;
924 715 : for (i = 0; i < parity_handle_max; ++i)
925 600 : parity_handle_map[i].bw = &io->bw;
926 :
927 : #if HAVE_THREAD
928 115 : if (io_cache == 0) {
929 : /*
930 : * Default is 16 MiB of cache
931 : * this seems to be a good tradeoff between speed and memory usage
932 : */
933 113 : io->io_max = 16 * 1024 * 1024 / state->block_size;
934 113 : if (io->io_max < IO_MIN)
935 0 : io->io_max = IO_MIN;
936 113 : if (io->io_max > IO_MAX)
937 113 : io->io_max = IO_MAX;
938 : } else {
939 2 : io->io_max = io_cache;
940 : }
941 : #else
942 : (void)io_cache;
943 :
944 : /* without pthread force the mono thread mode */
945 : io->io_max = 1;
946 : #endif
947 :
948 115 : assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
949 :
950 115 : io->buffer_max = buffer_max;
951 115 : allocated_size = 0;
952 14581 : for (i = 0; i < io->io_max; ++i) {
953 14466 : if (state->file_mode != ADVISE_DIRECT)
954 14338 : io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
955 : else
956 128 : io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
957 14466 : if (!state->opt.skip_self)
958 0 : mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
959 14466 : allocated_size += block_size * buffer_max;
960 : }
961 :
962 115 : msg_progress("Using %u MiB of memory for %u cached blocks.\n", (unsigned)(allocated_size / MEBI), io->io_max);
963 :
964 115 : if (parity_writer) {
965 100 : io->reader_max = handle_max;
966 100 : io->writer_max = parity_handle_max;
967 : } else {
968 15 : io->reader_max = handle_max + parity_handle_max;
969 15 : io->writer_max = 0;
970 : }
971 :
972 115 : io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
973 115 : io->reader_list = malloc_nofail(io->reader_max + 1);
974 115 : io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
975 115 : io->writer_list = malloc_nofail(io->writer_max + 1);
976 :
977 115 : io->data_base = 0;
978 115 : io->data_count = handle_max;
979 115 : io->parity_base = handle_max;
980 115 : io->parity_count = parity_handle_max;
981 :
982 863 : for (i = 0; i < io->reader_max; ++i) {
983 748 : struct snapraid_worker* worker = &io->reader_map[i];
984 :
985 748 : worker->io = io;
986 :
987 748 : if (i < handle_max) {
988 : /* it's a data read */
989 663 : worker->handle = &handle_map[i];
990 663 : worker->parity_handle = 0;
991 663 : worker->func = data_reader;
992 :
993 : /* data read is put in lower buffer index */
994 663 : worker->buffer_skew = 0;
995 : } else {
996 : /* it's a parity read */
997 85 : worker->handle = 0;
998 85 : worker->parity_handle = &parity_handle_map[i - handle_max];
999 85 : worker->func = parity_reader;
1000 :
1001 : /* parity read is put after data and computed parity */
1002 85 : worker->buffer_skew = parity_handle_max;
1003 : }
1004 : }
1005 :
1006 630 : for (i = 0; i < io->writer_max; ++i) {
1007 515 : struct snapraid_worker* worker = &io->writer_map[i];
1008 :
1009 515 : worker->io = io;
1010 :
1011 : /* it's a parity write */
1012 515 : worker->handle = 0;
1013 515 : worker->parity_handle = &parity_handle_map[i];
1014 515 : worker->func = parity_writer;
1015 :
1016 : /* parity to write is put after data */
1017 515 : worker->buffer_skew = handle_max;
1018 : }
1019 :
1020 : #if HAVE_THREAD
1021 115 : if (io->io_max > 1) {
1022 113 : io_read_next = io_read_next_thread;
1023 113 : io_write_preset = io_write_preset_thread;
1024 113 : io_write_next = io_write_next_thread;
1025 113 : io_refresh = io_refresh_thread;
1026 113 : io_flush = io_flush_thread;
1027 113 : io_data_read = io_data_read_thread;
1028 113 : io_parity_read = io_parity_read_thread;
1029 113 : io_parity_write = io_parity_write_thread;
1030 113 : io_start = io_start_thread;
1031 113 : io_stop = io_stop_thread;
1032 :
1033 113 : thread_mutex_init(&io->io_mutex);
1034 113 : thread_cond_init(&io->read_done);
1035 113 : thread_cond_init(&io->read_sched);
1036 113 : thread_cond_init(&io->write_done);
1037 113 : thread_cond_init(&io->write_sched);
1038 : } else
1039 : #endif
1040 : {
1041 2 : io_read_next = io_read_next_mono;
1042 2 : io_write_preset = io_write_preset_mono;
1043 2 : io_write_next = io_write_next_mono;
1044 2 : io_refresh = io_refresh_mono;
1045 2 : io_flush = io_flush_mono;
1046 2 : io_data_read = io_data_read_mono;
1047 2 : io_parity_read = io_parity_read_mono;
1048 2 : io_parity_write = io_parity_write_mono;
1049 2 : io_start = io_start_mono;
1050 2 : io_stop = io_stop_mono;
1051 : }
1052 115 : }
1053 :
1054 103 : void io_done(struct snapraid_io* io)
1055 : {
1056 : unsigned i;
1057 :
1058 13033 : for (i = 0; i < io->io_max; ++i) {
1059 12930 : free(io->buffer_map[i]);
1060 12930 : free(io->buffer_alloc_map[i]);
1061 : }
1062 :
1063 103 : free(io->reader_map);
1064 103 : free(io->reader_list);
1065 103 : free(io->writer_map);
1066 103 : free(io->writer_list);
1067 :
1068 : #if HAVE_THREAD
1069 103 : if (io->io_max > 1) {
1070 101 : thread_mutex_destroy(&io->io_mutex);
1071 101 : thread_cond_destroy(&io->read_done);
1072 101 : thread_cond_destroy(&io->read_sched);
1073 101 : thread_cond_destroy(&io->write_done);
1074 101 : thread_cond_destroy(&io->write_sched);
1075 : }
1076 : #endif
1077 103 : }
1078 :
|