Line data Source code
1 : /*
2 : * Copyright (C) 2016 Andrea Mazzoleni
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "portable.h"
19 :
20 : #include "io.h"
21 :
22 : void (*io_start)(struct snapraid_io* io,
23 : block_off_t blockstart, block_off_t blockmax,
24 : bit_vect_t* block_enabled) = 0;
25 : void (*io_stop)(struct snapraid_io* io) = 0;
26 : block_off_t (*io_read_next)(struct snapraid_io* io, void*** buffer) = 0;
27 : struct snapraid_task* (*io_data_read)(struct snapraid_io* io, unsigned* diskcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
28 : struct snapraid_task* (*io_parity_read)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
29 : void (*io_parity_write)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
30 : void (*io_write_preset)(struct snapraid_io* io, block_off_t blockcur, int skip) = 0;
31 : void (*io_write_next)(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error) = 0;
32 : void (*io_refresh)(struct snapraid_io* io) = 0;
33 : void (*io_flush)(struct snapraid_io* io) = 0;
34 :
35 : /**
36 : * Get the next block position to process.
37 : */
38 174856 : static block_off_t io_position_next(struct snapraid_io* io)
39 : {
40 : block_off_t blockcur;
41 :
42 : /* get the next position */
43 174856 : if (io->block_enabled) {
44 532189 : while (io->block_next < io->block_max && !bit_vect_test(io->block_enabled, io->block_next))
45 367234 : ++io->block_next;
46 : }
47 :
48 174856 : blockcur = io->block_next;
49 :
50 : /* next block for the next call */
51 174856 : ++io->block_next;
52 :
53 174856 : return blockcur;
54 : }
55 :
56 : /**
57 : * Setup the next pending task for all readers.
58 : */
59 174856 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
60 : {
61 : unsigned i;
62 :
63 1446628 : for (i = 0; i < io->reader_max; ++i) {
64 1271772 : struct snapraid_worker* worker = &io->reader_map[i];
65 1271772 : struct snapraid_task* task = &worker->task_map[task_index];
66 :
67 : /* setup the new pending task */
68 1271772 : if (blockcur < io->block_max)
69 1181136 : task->state = TASK_STATE_READY;
70 : else
71 90636 : task->state = TASK_STATE_EMPTY;
72 :
73 1271772 : task->path[0] = 0;
74 1271772 : if (worker->handle)
75 1049136 : task->disk = worker->handle->disk;
76 : else
77 222636 : task->disk = 0;
78 :
79 1271772 : assert(worker->buffer_skew + i < io->buffer_max);
80 :
81 1271772 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
82 1271772 : task->position = blockcur;
83 1271772 : task->block = 0;
84 1271772 : task->file = 0;
85 1271772 : task->file_pos = 0;
86 1271772 : task->read_size = 0;
87 1271772 : task->is_timestamp_different = 0;
88 : }
89 174856 : }
90 :
91 : /**
92 : * Setup the next pending task for all writers.
93 : */
94 125307 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
95 : {
96 : unsigned i;
97 :
98 699969 : for (i = 0; i < io->writer_max; ++i) {
99 574662 : struct snapraid_worker* worker = &io->writer_map[i];
100 574662 : struct snapraid_task* task = &worker->task_map[task_index];
101 :
102 : /* setup the new pending task */
103 574662 : task->state = TASK_STATE_READY;
104 574662 : task->path[0] = 0;
105 574662 : task->disk = 0;
106 574662 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
107 574662 : task->position = blockcur;
108 574662 : task->block = 0;
109 574662 : task->file = 0;
110 574662 : task->file_pos = 0;
111 574662 : task->read_size = 0;
112 574662 : task->is_timestamp_different = 0;
113 : }
114 125307 : }
115 :
116 : /**
117 : * Setup an empty next pending task for all writers.
118 : */
119 921 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
120 : {
121 : unsigned i;
122 :
123 6447 : for (i = 0; i < io->writer_max; ++i) {
124 5526 : struct snapraid_worker* worker = &io->writer_map[i];
125 5526 : struct snapraid_task* task = &worker->task_map[task_index];
126 :
127 : /* setup the new pending task */
128 5526 : task->state = TASK_STATE_EMPTY;
129 5526 : task->path[0] = 0;
130 5526 : task->disk = 0;
131 5526 : task->buffer = 0;
132 5526 : task->position = blockcur;
133 5526 : task->block = 0;
134 5526 : task->file = 0;
135 5526 : task->file_pos = 0;
136 5526 : task->read_size = 0;
137 5526 : task->is_timestamp_different = 0;
138 : }
139 921 : }
140 :
141 : /*****************************************************************************/
142 : /* mono thread */
143 :
144 9376 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
145 : {
146 : block_off_t blockcur_schedule;
147 :
148 : /* reset the index */
149 9376 : io->reader_index = 0;
150 :
151 9376 : blockcur_schedule = io_position_next(io);
152 :
153 : /* schedule the next read */
154 9376 : io_reader_sched(io, 0, blockcur_schedule);
155 :
156 : /* set the buffer to use */
157 9376 : *buffer = io->buffer_map[0];
158 :
159 9376 : return blockcur_schedule;
160 : }
161 :
162 9374 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
163 : {
164 : unsigned i;
165 :
166 : /* reset the index */
167 9374 : io->writer_index = 0;
168 :
169 : /* clear errors */
170 46870 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
171 37496 : io->writer_error[i] = 0;
172 :
173 9374 : if (skip) {
174 : /* skip the next write */
175 0 : io_writer_sched_empty(io, 0, blockcur);
176 : } else {
177 : /* schedule the next write */
178 9374 : io_writer_sched(io, 0, blockcur);
179 : }
180 9374 : }
181 :
182 9374 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
183 : {
184 : unsigned i;
185 :
186 : (void)blockcur;
187 : (void)skip;
188 :
189 : /* report errors */
190 46870 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
191 37496 : writer_error[i] = io->writer_error[i];
192 9374 : }
193 :
194 0 : static void io_refresh_mono(struct snapraid_io* io)
195 : {
196 : (void)io;
197 0 : }
198 :
199 2 : static void io_flush_mono(struct snapraid_io* io)
200 : {
201 : (void)io;
202 2 : }
203 :
204 56244 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
205 : {
206 : struct snapraid_worker* worker;
207 : struct snapraid_task* task;
208 : unsigned i;
209 :
210 : /* get the next task */
211 56244 : i = io->reader_index++;
212 :
213 56244 : assert(base <= i && i < base + count);
214 :
215 56244 : worker = &io->reader_map[i];
216 56244 : task = &worker->task_map[0];
217 :
218 : /* do the work */
219 56244 : if (task->state != TASK_STATE_EMPTY)
220 56244 : worker->func(worker, task);
221 :
222 : /* return the position */
223 56244 : *pos = i - base;
224 :
225 : /* store the waiting index */
226 56244 : waiting_map[0] = i - base;
227 56244 : *waiting_mac = 1;
228 :
229 56244 : return task;
230 : }
231 :
232 56244 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
233 : {
234 56244 : return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
235 : }
236 :
237 0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
238 : {
239 0 : return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
240 : }
241 :
242 9374 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
243 : {
244 : struct snapraid_worker* worker;
245 : struct snapraid_task* task;
246 : unsigned i;
247 :
248 : /* get the next task */
249 9374 : i = io->writer_index++;
250 :
251 9374 : worker = &io->writer_map[i];
252 9374 : task = &worker->task_map[0];
253 :
254 9374 : io->writer_error[i] = 0;
255 :
256 : /* do the work */
257 9374 : if (task->state != TASK_STATE_EMPTY)
258 9374 : worker->func(worker, task);
259 :
260 : /* return the position */
261 9374 : *pos = i;
262 :
263 : /* store the waiting index */
264 9374 : waiting_map[0] = i;
265 9374 : *waiting_mac = 1;
266 9374 : }
267 :
268 2 : static void io_start_mono(struct snapraid_io* io,
269 : block_off_t blockstart, block_off_t blockmax,
270 : bit_vect_t* block_enabled)
271 : {
272 2 : io->block_start = blockstart;
273 2 : io->block_max = blockmax;
274 2 : io->block_enabled = block_enabled;
275 2 : io->block_next = blockstart;
276 2 : }
277 :
278 2 : static void io_stop_mono(struct snapraid_io* io)
279 : {
280 : (void)io;
281 2 : }
282 :
283 : /*****************************************************************************/
284 : /* multi thread */
285 :
286 : /* disable multithread if pthread is not present */
287 : #if HAVE_THREAD
288 :
289 : /**
290 : * Get the next task to work on for a reader.
291 : *
292 : * This is the synchronization point for workers with the io.
293 : */
294 1215240 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
295 : {
296 1215240 : struct snapraid_io* io = worker->io;
297 :
298 : /* the synchronization is protected by the io mutex */
299 1215240 : thread_mutex_lock(&io->io_mutex);
300 :
301 1007641 : while (1) {
302 : unsigned next_index;
303 :
304 : /* check if the worker has to exit */
305 : /* even if there is work to do */
306 2222881 : if (io->done) {
307 708 : thread_mutex_unlock(&io->io_mutex);
308 708 : return 0;
309 : }
310 :
311 : /* get the next pending task */
312 2222173 : next_index = (worker->index + 1) % io->io_max;
313 :
314 : /* if the queue of pending tasks is not empty */
315 2222173 : if (next_index != io->reader_index) {
316 : struct snapraid_task* task;
317 :
318 : /* the index that the IO may be waiting for */
319 1214532 : unsigned waiting_index = io->reader_index;
320 :
321 : /* the index that worker just completed */
322 1214532 : unsigned done_index = worker->index;
323 :
324 : /* get the new working task */
325 1214532 : worker->index = next_index;
326 1214532 : task = &worker->task_map[worker->index];
327 :
328 : /* if the just completed task is at this index */
329 1214532 : if (done_index == waiting_index) {
330 : /* notify the IO that a new read is complete */
331 845 : thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
332 : } else {
333 1213687 : thread_mutex_unlock(&io->io_mutex);
334 : }
335 :
336 : /* return the new task */
337 1214532 : return task;
338 : }
339 :
340 : /* otherwise wait for a read_sched event */
341 1007641 : thread_cond_wait(&io->read_sched, &io->io_mutex);
342 : }
343 : }
344 :
345 : /**
346 : * Get the next task to work on for a writer.
347 : *
348 : * This is the synchronization point for workers with the io.
349 : */
350 571319 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
351 : {
352 571319 : struct snapraid_io* io = worker->io;
353 : int error_index;
354 :
355 : /* the synchronization is protected by the io mutex */
356 571319 : thread_mutex_lock(&io->io_mutex);
357 :
358 : /* counts the number of errors in the global state */
359 571319 : error_index = state - IO_WRITER_ERROR_BASE;
360 571319 : if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
361 0 : ++io->writer_error[error_index];
362 :
363 550697 : while (1) {
364 : unsigned next_index;
365 :
366 : /* get the next pending task */
367 1122016 : next_index = (worker->index + 1) % io->io_max;
368 :
369 : /* if the queue of pending tasks is not empty */
370 1122016 : if (next_index != io->writer_index) {
371 : struct snapraid_task* task;
372 :
373 : /* the index that the IO may be waiting for */
374 570814 : unsigned waiting_index = (io->writer_index + 1) % io->io_max;
375 :
376 : /* the index that worker just completed */
377 570814 : unsigned done_index = worker->index;
378 :
379 : /* get the new working task */
380 570814 : worker->index = next_index;
381 570814 : task = &worker->task_map[worker->index];
382 :
383 : /* if the just completed task is at this index */
384 570814 : if (done_index == waiting_index) {
385 : /* notify the IO that a new write is complete */
386 4377 : thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
387 : } else {
388 566437 : thread_mutex_unlock(&io->io_mutex);
389 : }
390 :
391 : /* return the new task */
392 570814 : return task;
393 : }
394 :
395 : /* check if the worker has to exit */
396 : /* but only if there is no work to do */
397 551202 : if (io->done) {
398 505 : thread_mutex_unlock(&io->io_mutex);
399 505 : return 0;
400 : }
401 :
402 : /* otherwise wait for a write_sched event */
403 550697 : thread_cond_wait(&io->write_sched, &io->io_mutex);
404 : }
405 : }
406 :
407 : /**
408 : * Get the next block position to operate on.
409 : *
410 : * This is the synchronization point for workers with the io.
411 : */
412 152272 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
413 : {
414 : block_off_t blockcur_schedule;
415 : block_off_t blockcur_caller;
416 : unsigned i;
417 :
418 : /* get the next parity position to process */
419 152272 : blockcur_schedule = io_position_next(io);
420 :
421 : /* ensure that all data/parity was read */
422 152272 : assert(io->reader_list[0] == io->reader_max);
423 :
424 : /* setup the list of workers to process */
425 1430144 : for (i = 0; i <= io->reader_max; ++i)
426 1277872 : io->reader_list[i] = i;
427 :
428 : /* the synchronization is protected by the io mutex */
429 152272 : thread_mutex_lock(&io->io_mutex);
430 :
431 : /* schedule the next read */
432 152272 : io_reader_sched(io, io->reader_index, blockcur_schedule);
433 :
434 : /* set the index for the tasks to return to the caller */
435 152272 : io->reader_index = (io->reader_index + 1) % io->io_max;
436 :
437 : /* get the position to operate at high level from one task */
438 152272 : blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
439 :
440 : /* set the buffer to use */
441 152272 : *buffer = io->buffer_map[io->reader_index];
442 :
443 : /* signal all the workers that there is a new pending task */
444 152272 : thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
445 :
446 152272 : return blockcur_caller;
447 : }
448 :
449 116854 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
450 : {
451 : (void)io;
452 : (void)blockcur;
453 : (void)skip;
454 116854 : }
455 :
456 116854 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
457 : {
458 : unsigned i;
459 :
460 : /* ensure that all parity was written */
461 116854 : assert(io->writer_list[0] == io->writer_max);
462 :
463 : /* setup the list of workers to process */
464 804522 : for (i = 0; i <= io->writer_max; ++i)
465 687668 : io->writer_list[i] = i;
466 :
467 : /* the synchronization is protected by the io mutex */
468 116854 : thread_mutex_lock(&io->io_mutex);
469 :
470 : /* report errors */
471 584270 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
472 467416 : writer_error[i] = io->writer_error[i];
473 467416 : io->writer_error[i] = 0;
474 : }
475 :
476 116854 : if (skip) {
477 : /* skip the next write */
478 921 : io_writer_sched_empty(io, io->writer_index, blockcur);
479 : } else {
480 : /* schedule the next write */
481 115933 : io_writer_sched(io, io->writer_index, blockcur);
482 : }
483 :
484 : /* at this point the writers must be in sync with the readers */
485 116854 : assert(io->writer_index == io->reader_index);
486 :
487 : /* set the index to be used for the next write */
488 116854 : io->writer_index = (io->writer_index + 1) % io->io_max;
489 :
490 : /* signal all the workers that there is a new pending task */
491 116854 : thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
492 116854 : }
493 :
494 0 : static void io_refresh_thread(struct snapraid_io* io)
495 : {
496 : unsigned i;
497 :
498 : /* the synchronization is protected by the io mutex */
499 0 : thread_mutex_lock(&io->io_mutex);
500 :
501 : /* for all readers, count the number of read blocks */
502 0 : for (i = 0; i < io->reader_max; ++i) {
503 : unsigned begin, end, cached;
504 0 : struct snapraid_worker* worker = &io->reader_map[i];
505 :
506 : /* the first block read */
507 0 : begin = io->reader_index + 1;
508 : /* the block in reading */
509 0 : end = worker->index;
510 0 : if (begin > end)
511 0 : end += io->io_max;
512 0 : cached = end - begin;
513 :
514 0 : if (worker->parity_handle)
515 0 : io->state->parity[worker->parity_handle->level].cached_blocks = cached;
516 : else
517 0 : worker->handle->disk->cached_blocks = cached;
518 : }
519 :
520 : /* for all writers, count the number of written blocks */
521 : /* note that this is a kind of "opposite" of cached blocks */
522 0 : for (i = 0; i < io->writer_max; ++i) {
523 : unsigned begin, end, cached;
524 0 : struct snapraid_worker* worker = &io->writer_map[i];
525 :
526 : /* the first block written */
527 0 : begin = io->writer_index + 1;
528 : /* the block in writing */
529 0 : end = worker->index;
530 0 : if (begin > end)
531 0 : end += io->io_max;
532 0 : cached = end - begin;
533 :
534 0 : io->state->parity[worker->parity_handle->level].cached_blocks = cached;
535 : }
536 :
537 0 : thread_mutex_unlock(&io->io_mutex);
538 0 : }
539 :
540 148094 : static void io_flush_thread(struct snapraid_io* io)
541 : {
542 : unsigned i;
543 :
544 148003 : while (1) {
545 148094 : int all_done = 1;
546 :
547 : /* the synchronization is protected by the io mutex */
548 148094 : thread_mutex_lock(&io->io_mutex);
549 :
550 196697 : for (i = 0; i < io->writer_max; ++i) {
551 196606 : struct snapraid_worker* worker = &io->writer_map[i];
552 :
553 : /* get the next pending task */
554 196606 : unsigned next_index = (worker->index + 1) % io->io_max;
555 :
556 : /* if the queue of pending tasks is not empty */
557 196606 : if (next_index != io->writer_index) {
558 148003 : all_done = 0;
559 148003 : break;
560 : }
561 : }
562 :
563 148094 : thread_mutex_unlock(&io->io_mutex);
564 :
565 148094 : if (all_done)
566 91 : break;
567 :
568 : /* wait for something to complete */
569 148003 : thread_yield();
570 : }
571 91 : }
572 :
573 1124892 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
574 : {
575 : unsigned waiting_cycle;
576 :
577 : /* count the waiting cycle */
578 1124892 : waiting_cycle = 0;
579 :
580 : /* clear the waiting indexes */
581 1124892 : *waiting_mac = 0;
582 :
583 : /* the synchronization is protected by the io mutex */
584 1124892 : thread_mutex_lock(&io->io_mutex);
585 :
586 766 : while (1) {
587 : unsigned char* let;
588 : unsigned busy_index;
589 :
590 : /* get the index the IO is using */
591 : /* we must ensure that this index has not a read in progress */
592 : /* to avoid a concurrent access */
593 1125658 : busy_index = io->reader_index;
594 :
595 : /* search for a worker that has already finished */
596 1125658 : let = &io->reader_list[0];
597 3415 : while (1) {
598 1129073 : unsigned i = *let;
599 :
600 : /* if we are at the end */
601 1129073 : if (i == io->reader_max)
602 766 : break;
603 :
604 : /* if it's in range */
605 1128307 : if (base <= i && i < base + count) {
606 : struct snapraid_worker* worker;
607 :
608 : /* if it's the first cycle */
609 1126993 : if (waiting_cycle == 0) {
610 : /* store the waiting indexes */
611 1126166 : waiting_map[(*waiting_mac)++] = i - base;
612 : }
613 :
614 1126993 : worker = &io->reader_map[i];
615 :
616 : /* if the worker has finished this index */
617 1126993 : if (busy_index != worker->index) {
618 : struct snapraid_task* task;
619 :
620 1124892 : task = &worker->task_map[io->reader_index];
621 :
622 1124892 : thread_mutex_unlock(&io->io_mutex);
623 :
624 : /* mark the worker as processed */
625 : /* setting the previous one to point at the next one */
626 1124892 : *let = io->reader_list[i + 1];
627 :
628 : /* return the position */
629 1124892 : *pos = i - base;
630 :
631 : /* on the first cycle, no one is waiting */
632 1124892 : if (waiting_cycle == 0)
633 1124126 : *waiting_mac = 0;
634 :
635 1124892 : return task;
636 : }
637 : }
638 :
639 : /* next position to check */
640 3415 : let = &io->reader_list[i + 1];
641 : }
642 :
643 : /* if no worker is ready, wait for an event */
644 766 : thread_cond_wait(&io->read_done, &io->io_mutex);
645 :
646 : /* count the cycles */
647 766 : ++waiting_cycle;
648 : }
649 : }
650 :
651 913008 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
652 : {
653 913008 : return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
654 : }
655 :
656 211884 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
657 : {
658 211884 : return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
659 : }
660 :
661 570814 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
662 : {
663 : unsigned waiting_cycle;
664 :
665 : /* count the waiting cycle */
666 570814 : waiting_cycle = 0;
667 :
668 : /* clear the waiting indexes */
669 570814 : *waiting_mac = 0;
670 :
671 : /* the synchronization is protected by the io mutex */
672 570814 : thread_mutex_lock(&io->io_mutex);
673 :
674 4367 : while (1) {
675 : unsigned char* let;
676 : unsigned busy_index;
677 :
678 : /* get the next index the IO is going to use */
679 : /* we must ensure that this index has not a write in progress */
680 : /* to avoid a concurrent access */
681 : /* note that we are already sure that a write is not in progress */
682 : /* at the index the IO is using at now */
683 575181 : busy_index = (io->writer_index + 1) % io->io_max;
684 :
685 : /* search for a worker that has already finished */
686 575181 : let = &io->writer_list[0];
687 4367 : while (1) {
688 579548 : unsigned i = *let;
689 : struct snapraid_worker* worker;
690 :
691 : /* if we are at the end */
692 579548 : if (i == io->writer_max)
693 4367 : break;
694 :
695 : /* if it's the first cycle */
696 575181 : if (waiting_cycle == 0) {
697 : /* store the waiting indexes */
698 570814 : waiting_map[(*waiting_mac)++] = i;
699 : }
700 :
701 575181 : worker = &io->writer_map[i];
702 :
703 : /* the two indexes cannot be equal */
704 575181 : assert(io->writer_index != worker->index);
705 :
706 : /* if the worker has finished this index */
707 575181 : if (busy_index != worker->index) {
708 570814 : thread_mutex_unlock(&io->io_mutex);
709 :
710 : /* mark the worker as processed */
711 : /* setting the previous one to point at the next one */
712 570814 : *let = io->writer_list[i + 1];
713 :
714 : /* return the position */
715 570814 : *pos = i;
716 :
717 : /* on the first cycle, no one is waiting */
718 570814 : if (waiting_cycle == 0)
719 566447 : *waiting_mac = 0;
720 :
721 570814 : return;
722 : }
723 :
724 : /* next position to check */
725 4367 : let = &io->writer_list[i + 1];
726 : }
727 :
728 : /* if no worker is ready, wait for an event */
729 4367 : thread_cond_wait(&io->write_done, &io->io_mutex);
730 :
731 : /* count the cycles */
732 4367 : ++waiting_cycle;
733 : }
734 : }
735 :
736 1125030 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
737 : {
738 : /* if we reached the end */
739 1125030 : if (task->position >= worker->io->block_max) {
740 : /* complete a dummy task */
741 138 : task->state = TASK_STATE_EMPTY;
742 : } else {
743 1124892 : worker->func(worker, task);
744 : }
745 1125030 : }
746 :
747 708 : static void* io_reader_thread(void* arg)
748 : {
749 708 : struct snapraid_worker* worker = arg;
750 :
751 : /* force completion of the first task */
752 708 : io_reader_worker(worker, &worker->task_map[0]);
753 :
754 1214532 : while (1) {
755 : struct snapraid_task* task;
756 :
757 : /* get the new task */
758 1215240 : task = io_reader_step(worker);
759 :
760 : /* if no task, it means to exit */
761 1215240 : if (!task)
762 708 : break;
763 :
764 : /* nothing more to do */
765 1214532 : if (task->state == TASK_STATE_EMPTY)
766 90210 : continue;
767 :
768 1124322 : assert(task->state == TASK_STATE_READY);
769 :
770 : /* work on the assigned task */
771 1124322 : io_reader_worker(worker, task);
772 : }
773 :
774 708 : return 0;
775 : }
776 :
777 505 : static void* io_writer_thread(void* arg)
778 : {
779 505 : struct snapraid_worker* worker = arg;
780 505 : int latest_state = TASK_STATE_DONE;
781 :
782 570814 : while (1) {
783 : struct snapraid_task* task;
784 :
785 : /* get the new task */
786 571319 : task = io_writer_step(worker, latest_state);
787 :
788 : /* if no task, it means to exit */
789 571319 : if (!task)
790 505 : break;
791 :
792 : /* nothing more to do */
793 570814 : if (task->state == TASK_STATE_EMPTY) {
794 5526 : latest_state = TASK_STATE_DONE;
795 5526 : continue;
796 : }
797 :
798 565288 : assert(task->state == TASK_STATE_READY);
799 :
800 : /* work on the assigned task */
801 565288 : worker->func(worker, task);
802 :
803 : /* save the resulting state */
804 565288 : latest_state = task->state;
805 : }
806 :
807 505 : return 0;
808 : }
809 :
810 104 : static void io_start_thread(struct snapraid_io* io,
811 : block_off_t blockstart, block_off_t blockmax,
812 : bit_vect_t* block_enabled)
813 : {
814 : unsigned i;
815 :
816 104 : io->block_start = blockstart;
817 104 : io->block_max = blockmax;
818 104 : io->block_enabled = block_enabled;
819 104 : io->block_next = blockstart;
820 :
821 104 : io->done = 0;
822 104 : io->reader_index = io->io_max - 1; /* the first io_read_next() is going to set it to 0 */
823 104 : io->writer_index = 0;
824 :
825 : /* clear writer errors */
826 520 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
827 416 : io->writer_error[i] = 0;
828 :
829 : /* setup the initial read pending tasks, except the latest one, */
830 : /* the latest will be initialized at the fist io_read_next() call */
831 13312 : for (i = 0; i < io->io_max - 1; ++i) {
832 13208 : block_off_t blockcur = io_position_next(io);
833 :
834 13208 : io_reader_sched(io, i, blockcur);
835 : }
836 :
837 : /* setup the lists of workers to process */
838 104 : io->reader_list[0] = io->reader_max;
839 713 : for (i = 0; i <= io->writer_max; ++i)
840 609 : io->writer_list[i] = i;
841 :
842 : /* start the reader threads */
843 812 : for (i = 0; i < io->reader_max; ++i) {
844 708 : struct snapraid_worker* worker = &io->reader_map[i];
845 :
846 708 : worker->index = 0;
847 :
848 708 : thread_create(&worker->thread, io_reader_thread, worker);
849 : }
850 :
851 : /* start the writer threads */
852 609 : for (i = 0; i < io->writer_max; ++i) {
853 505 : struct snapraid_worker* worker = &io->writer_map[i];
854 :
855 505 : worker->index = io->io_max - 1;
856 :
857 505 : thread_create(&worker->thread, io_writer_thread, worker);
858 : }
859 104 : }
860 :
861 104 : static void io_stop_thread(struct snapraid_io* io)
862 : {
863 : unsigned i;
864 :
865 104 : thread_mutex_lock(&io->io_mutex);
866 :
867 : /* mark that we are stopping */
868 104 : io->done = 1;
869 :
870 : /* signal all the threads to recognize the new state */
871 104 : thread_cond_broadcast(&io->read_sched);
872 104 : thread_cond_broadcast(&io->write_sched);
873 :
874 104 : thread_mutex_unlock(&io->io_mutex);
875 :
876 : /* wait for all readers to terminate */
877 812 : for (i = 0; i < io->reader_max; ++i) {
878 708 : struct snapraid_worker* worker = &io->reader_map[i];
879 : void* retval;
880 :
881 : /* wait for thread termination */
882 708 : thread_join(worker->thread, &retval);
883 : }
884 :
885 : /* wait for all writers to terminate */
886 609 : for (i = 0; i < io->writer_max; ++i) {
887 505 : struct snapraid_worker* worker = &io->writer_map[i];
888 : void* retval;
889 :
890 : /* wait for thread termination */
891 505 : thread_join(worker->thread, &retval);
892 : }
893 104 : }
894 :
895 : #endif
896 :
897 : /*****************************************************************************/
898 : /* global */
899 :
900 106 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
901 : unsigned io_cache, unsigned buffer_max,
902 : void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
903 : struct snapraid_handle* handle_map, unsigned handle_max,
904 : void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
905 : void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
906 : struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
907 : {
908 : unsigned i;
909 : size_t allocated_size;
910 106 : size_t block_size = state->block_size;
911 :
912 106 : io->state = state;
913 :
914 106 : assert(buffer_max >= handle_max + parity_handle_max);
915 :
916 : /* initialize bandwidth limiting */
917 106 : bw_init(&io->bw, state->opt.bwlimit);
918 :
919 : /* set IO context in handles */
920 742 : for (i = 0; i < handle_max; ++i)
921 636 : handle_map[i].bw = &io->bw;
922 697 : for (i = 0; i < parity_handle_max; ++i)
923 591 : parity_handle_map[i].bw = &io->bw;
924 :
925 : #if HAVE_THREAD
926 106 : if (io_cache == 0) {
927 : /* default is 16 MiB of cache */
928 : /* this seems to be a good tradeoff between speed and memory usage */
929 104 : io->io_max = 16 * 1024 * 1024 / state->block_size;
930 104 : if (io->io_max < IO_MIN)
931 0 : io->io_max = IO_MIN;
932 104 : if (io->io_max > IO_MAX)
933 104 : io->io_max = IO_MAX;
934 : } else {
935 2 : io->io_max = io_cache;
936 : }
937 : #else
938 : (void)io_cache;
939 :
940 : /* without pthread force the mono thread mode */
941 : io->io_max = 1;
942 : #endif
943 :
944 106 : assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
945 :
946 106 : io->buffer_max = buffer_max;
947 106 : allocated_size = 0;
948 13420 : for (i = 0; i < io->io_max; ++i) {
949 13314 : if (state->file_mode != ADVISE_DIRECT)
950 13186 : io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
951 : else
952 128 : io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
953 13314 : if (!state->opt.skip_self)
954 0 : mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
955 13314 : allocated_size += block_size * buffer_max;
956 : }
957 :
958 106 : msg_progress("Using %u MiB of memory for %u cached blocks.\n", (unsigned)(allocated_size / MEBI), io->io_max);
959 :
960 106 : if (parity_writer) {
961 92 : io->reader_max = handle_max;
962 92 : io->writer_max = parity_handle_max;
963 : } else {
964 14 : io->reader_max = handle_max + parity_handle_max;
965 14 : io->writer_max = 0;
966 : }
967 :
968 106 : io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
969 106 : io->reader_list = malloc_nofail(io->reader_max + 1);
970 106 : io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
971 106 : io->writer_list = malloc_nofail(io->writer_max + 1);
972 :
973 106 : io->data_base = 0;
974 106 : io->data_count = handle_max;
975 106 : io->parity_base = handle_max;
976 106 : io->parity_count = parity_handle_max;
977 :
978 826 : for (i = 0; i < io->reader_max; ++i) {
979 720 : struct snapraid_worker* worker = &io->reader_map[i];
980 :
981 720 : worker->io = io;
982 :
983 720 : if (i < handle_max) {
984 : /* it's a data read */
985 636 : worker->handle = &handle_map[i];
986 636 : worker->parity_handle = 0;
987 636 : worker->func = data_reader;
988 :
989 : /* data read is put in lower buffer index */
990 636 : worker->buffer_skew = 0;
991 : } else {
992 : /* it's a parity read */
993 84 : worker->handle = 0;
994 84 : worker->parity_handle = &parity_handle_map[i - handle_max];
995 84 : worker->func = parity_reader;
996 :
997 : /* parity read is put after data and computed parity */
998 84 : worker->buffer_skew = parity_handle_max;
999 : }
1000 : }
1001 :
1002 613 : for (i = 0; i < io->writer_max; ++i) {
1003 507 : struct snapraid_worker* worker = &io->writer_map[i];
1004 :
1005 507 : worker->io = io;
1006 :
1007 : /* it's a parity write */
1008 507 : worker->handle = 0;
1009 507 : worker->parity_handle = &parity_handle_map[i];
1010 507 : worker->func = parity_writer;
1011 :
1012 : /* parity to write is put after data */
1013 507 : worker->buffer_skew = handle_max;
1014 : }
1015 :
1016 : #if HAVE_THREAD
1017 106 : if (io->io_max > 1) {
1018 104 : io_read_next = io_read_next_thread;
1019 104 : io_write_preset = io_write_preset_thread;
1020 104 : io_write_next = io_write_next_thread;
1021 104 : io_refresh = io_refresh_thread;
1022 104 : io_flush = io_flush_thread;
1023 104 : io_data_read = io_data_read_thread;
1024 104 : io_parity_read = io_parity_read_thread;
1025 104 : io_parity_write = io_parity_write_thread;
1026 104 : io_start = io_start_thread;
1027 104 : io_stop = io_stop_thread;
1028 :
1029 104 : thread_mutex_init(&io->io_mutex);
1030 104 : thread_cond_init(&io->read_done);
1031 104 : thread_cond_init(&io->read_sched);
1032 104 : thread_cond_init(&io->write_done);
1033 104 : thread_cond_init(&io->write_sched);
1034 : } else
1035 : #endif
1036 : {
1037 2 : io_read_next = io_read_next_mono;
1038 2 : io_write_preset = io_write_preset_mono;
1039 2 : io_write_next = io_write_next_mono;
1040 2 : io_refresh = io_refresh_mono;
1041 2 : io_flush = io_flush_mono;
1042 2 : io_data_read = io_data_read_mono;
1043 2 : io_parity_read = io_parity_read_mono;
1044 2 : io_parity_write = io_parity_write_mono;
1045 2 : io_start = io_start_mono;
1046 2 : io_stop = io_stop_mono;
1047 : }
1048 106 : }
1049 :
1050 106 : void io_done(struct snapraid_io* io)
1051 : {
1052 : unsigned i;
1053 :
1054 13420 : for (i = 0; i < io->io_max; ++i) {
1055 13314 : free(io->buffer_map[i]);
1056 13314 : free(io->buffer_alloc_map[i]);
1057 : }
1058 :
1059 106 : free(io->reader_map);
1060 106 : free(io->reader_list);
1061 106 : free(io->writer_map);
1062 106 : free(io->writer_list);
1063 :
1064 : #if HAVE_THREAD
1065 106 : if (io->io_max > 1) {
1066 104 : thread_mutex_destroy(&io->io_mutex);
1067 104 : thread_cond_destroy(&io->read_done);
1068 104 : thread_cond_destroy(&io->read_sched);
1069 104 : thread_cond_destroy(&io->write_done);
1070 104 : thread_cond_destroy(&io->write_sched);
1071 : }
1072 : #endif
1073 106 : }
1074 :
|