Line data Source code
1 : /*
2 : * Copyright (C) 2016 Andrea Mazzoleni
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "portable.h"
19 :
20 : #include "io.h"
21 :
22 : void (*io_start)(struct snapraid_io* io,
23 : block_off_t blockstart, block_off_t blockmax,
24 : bit_vect_t* block_enabled) = 0;
25 : void (*io_stop)(struct snapraid_io* io) = 0;
26 : block_off_t (*io_read_next)(struct snapraid_io* io, void*** buffer) = 0;
27 : struct snapraid_task* (*io_data_read)(struct snapraid_io* io, unsigned* diskcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
28 : struct snapraid_task* (*io_parity_read)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
29 : void (*io_parity_write)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
30 : void (*io_write_preset)(struct snapraid_io* io, block_off_t blockcur, int skip) = 0;
31 : void (*io_write_next)(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error) = 0;
32 : void (*io_refresh)(struct snapraid_io* io) = 0;
33 : void (*io_flush)(struct snapraid_io* io) = 0;
34 :
35 : /**
36 : * Get the next block position to process.
37 : */
38 160035 : static block_off_t io_position_next(struct snapraid_io* io)
39 : {
40 : block_off_t blockcur;
41 :
42 : /* get the next position */
43 160035 : if (io->block_enabled) {
44 478075 : while (io->block_next < io->block_max && !bit_vect_test(io->block_enabled, io->block_next))
45 327941 : ++io->block_next;
46 : }
47 :
48 160035 : blockcur = io->block_next;
49 :
50 : /* next block for the next call */
51 160035 : ++io->block_next;
52 :
53 160035 : return blockcur;
54 : }
55 :
56 : /**
57 : * Setup the next pending task for all readers.
58 : */
59 160035 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
60 : {
61 : unsigned i;
62 :
63 1342701 : for (i = 0; i < io->reader_max; ++i) {
64 1182666 : struct snapraid_worker* worker = &io->reader_map[i];
65 1182666 : struct snapraid_task* task = &worker->task_map[task_index];
66 :
67 : /* setup the new pending task */
68 1182666 : if (blockcur < io->block_max)
69 1098180 : task->state = TASK_STATE_READY;
70 : else
71 84486 : task->state = TASK_STATE_EMPTY;
72 :
73 1182666 : task->path[0] = 0;
74 1182666 : if (worker->handle)
75 960210 : task->disk = worker->handle->disk;
76 : else
77 222456 : task->disk = 0;
78 :
79 1182666 : assert(worker->buffer_skew + i < io->buffer_max);
80 :
81 1182666 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
82 1182666 : task->position = blockcur;
83 1182666 : task->block = 0;
84 1182666 : task->file = 0;
85 1182666 : task->file_pos = 0;
86 1182666 : task->read_size = 0;
87 1182666 : task->is_timestamp_different = 0;
88 : }
89 160035 : }
90 :
91 : /**
92 : * Setup the next pending task for all writers.
93 : */
94 111477 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
95 : {
96 : unsigned i;
97 :
98 650234 : for (i = 0; i < io->writer_max; ++i) {
99 538757 : struct snapraid_worker* worker = &io->writer_map[i];
100 538757 : struct snapraid_task* task = &worker->task_map[task_index];
101 :
102 : /* setup the new pending task */
103 538757 : task->state = TASK_STATE_READY;
104 538757 : task->path[0] = 0;
105 538757 : task->disk = 0;
106 538757 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
107 538757 : task->position = blockcur;
108 538757 : task->block = 0;
109 538757 : task->file = 0;
110 538757 : task->file_pos = 0;
111 538757 : task->read_size = 0;
112 538757 : task->is_timestamp_different = 0;
113 : }
114 111477 : }
115 :
116 : /**
117 : * Setup an empty next pending task for all writers.
118 : */
119 985 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
120 : {
121 : unsigned i;
122 :
123 6895 : for (i = 0; i < io->writer_max; ++i) {
124 5910 : struct snapraid_worker* worker = &io->writer_map[i];
125 5910 : struct snapraid_task* task = &worker->task_map[task_index];
126 :
127 : /* setup the new pending task */
128 5910 : task->state = TASK_STATE_EMPTY;
129 5910 : task->path[0] = 0;
130 5910 : task->disk = 0;
131 5910 : task->buffer = 0;
132 5910 : task->position = blockcur;
133 5910 : task->block = 0;
134 5910 : task->file = 0;
135 5910 : task->file_pos = 0;
136 5910 : task->read_size = 0;
137 5910 : task->is_timestamp_different = 0;
138 : }
139 985 : }
140 :
141 : /*****************************************************************************/
142 : /* mono thread */
143 :
144 4688 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
145 : {
146 : block_off_t blockcur_schedule;
147 :
148 : /* reset the index */
149 4688 : io->reader_index = 0;
150 :
151 4688 : blockcur_schedule = io_position_next(io);
152 :
153 : /* schedule the next read */
154 4688 : io_reader_sched(io, 0, blockcur_schedule);
155 :
156 : /* set the buffer to use */
157 4688 : *buffer = io->buffer_map[0];
158 :
159 4688 : return blockcur_schedule;
160 : }
161 :
162 4687 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
163 : {
164 : unsigned i;
165 :
166 : /* reset the index */
167 4687 : io->writer_index = 0;
168 :
169 : /* clear errors */
170 23435 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
171 18748 : io->writer_error[i] = 0;
172 :
173 4687 : if (skip) {
174 : /* skip the next write */
175 0 : io_writer_sched_empty(io, 0, blockcur);
176 : } else {
177 : /* schedule the next write */
178 4687 : io_writer_sched(io, 0, blockcur);
179 : }
180 4687 : }
181 :
182 4687 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
183 : {
184 : unsigned i;
185 :
186 : (void)blockcur;
187 : (void)skip;
188 :
189 : /* report errors */
190 23435 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
191 18748 : writer_error[i] = io->writer_error[i];
192 4687 : }
193 :
194 0 : static void io_refresh_mono(struct snapraid_io* io)
195 : {
196 : (void)io;
197 0 : }
198 :
199 1 : static void io_flush_mono(struct snapraid_io* io)
200 : {
201 : (void)io;
202 1 : }
203 :
204 28122 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
205 : {
206 : struct snapraid_worker* worker;
207 : struct snapraid_task* task;
208 : unsigned i;
209 :
210 : /* get the next task */
211 28122 : i = io->reader_index++;
212 :
213 28122 : assert(base <= i && i < base + count);
214 :
215 28122 : worker = &io->reader_map[i];
216 28122 : task = &worker->task_map[0];
217 :
218 : /* do the work */
219 28122 : if (task->state != TASK_STATE_EMPTY)
220 28122 : worker->func(worker, task);
221 :
222 : /* return the position */
223 28122 : *pos = i - base;
224 :
225 : /* store the waiting index */
226 28122 : waiting_map[0] = i - base;
227 28122 : *waiting_mac = 1;
228 :
229 28122 : return task;
230 : }
231 :
232 28122 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
233 : {
234 28122 : return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
235 : }
236 :
237 0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
238 : {
239 0 : return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
240 : }
241 :
242 4687 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
243 : {
244 : struct snapraid_worker* worker;
245 : struct snapraid_task* task;
246 : unsigned i;
247 :
248 : /* get the next task */
249 4687 : i = io->writer_index++;
250 :
251 4687 : worker = &io->writer_map[i];
252 4687 : task = &worker->task_map[0];
253 :
254 4687 : io->writer_error[i] = 0;
255 :
256 : /* do the work */
257 4687 : if (task->state != TASK_STATE_EMPTY)
258 4687 : worker->func(worker, task);
259 :
260 : /* return the position */
261 4687 : *pos = i;
262 :
263 : /* store the waiting index */
264 4687 : waiting_map[0] = i;
265 4687 : *waiting_mac = 1;
266 4687 : }
267 :
268 1 : static void io_start_mono(struct snapraid_io* io,
269 : block_off_t blockstart, block_off_t blockmax,
270 : bit_vect_t* block_enabled)
271 : {
272 1 : io->block_start = blockstart;
273 1 : io->block_max = blockmax;
274 1 : io->block_enabled = block_enabled;
275 1 : io->block_next = blockstart;
276 1 : }
277 :
278 1 : static void io_stop_mono(struct snapraid_io* io)
279 : {
280 : (void)io;
281 1 : }
282 :
283 : /*****************************************************************************/
284 : /* multi thread */
285 :
286 : /* disable multithread if pthread is not present */
287 : #if HAVE_THREAD
288 :
289 : /**
290 : * Get the next task to work on for a reader.
291 : *
292 : * This is the synchronization point for workers with the io.
293 : */
294 1153294 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
295 : {
296 1153294 : struct snapraid_io* io = worker->io;
297 :
298 : /* the synchronization is protected by the io mutex */
299 1153294 : thread_mutex_lock(&io->io_mutex);
300 :
301 955717 : while (1) {
302 : unsigned next_index;
303 :
304 : /* check if the worker has to exit */
305 : /* even if there is work to do */
306 2109011 : if (io->done) {
307 660 : thread_mutex_unlock(&io->io_mutex);
308 660 : return 0;
309 : }
310 :
311 : /* get the next pending task */
312 2108351 : next_index = (worker->index + 1) % io->io_max;
313 :
314 : /* if the queue of pending tasks is not empty */
315 2108351 : if (next_index != io->reader_index) {
316 : struct snapraid_task* task;
317 :
318 : /* the index that the IO may be waiting for */
319 1152634 : unsigned waiting_index = io->reader_index;
320 :
321 : /* the index that worker just completed */
322 1152634 : unsigned done_index = worker->index;
323 :
324 : /* get the new working task */
325 1152634 : worker->index = next_index;
326 1152634 : task = &worker->task_map[worker->index];
327 :
328 : /* if the just completed task is at this index */
329 1152634 : if (done_index == waiting_index) {
330 : /* notify the IO that a new read is complete */
331 924 : thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
332 : } else {
333 1151710 : thread_mutex_unlock(&io->io_mutex);
334 : }
335 :
336 : /* return the new task */
337 1152634 : return task;
338 : }
339 :
340 : /* otherwise wait for a read_sched event */
341 955717 : thread_cond_wait(&io->read_sched, &io->io_mutex);
342 : }
343 : }
344 :
345 : /**
346 : * Get the next task to work on for a writer.
347 : *
348 : * This is the synchronization point for workers with the io.
349 : */
350 540442 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
351 : {
352 540442 : struct snapraid_io* io = worker->io;
353 : int error_index;
354 :
355 : /* the synchronization is protected by the io mutex */
356 540442 : thread_mutex_lock(&io->io_mutex);
357 :
358 : /* counts the number of errors in the global state */
359 540442 : error_index = state - IO_WRITER_ERROR_BASE;
360 540442 : if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
361 0 : ++io->writer_error[error_index];
362 :
363 521854 : while (1) {
364 : unsigned next_index;
365 :
366 : /* get the next pending task */
367 1062296 : next_index = (worker->index + 1) % io->io_max;
368 :
369 : /* if the queue of pending tasks is not empty */
370 1062296 : if (next_index != io->writer_index) {
371 : struct snapraid_task* task;
372 :
373 : /* the index that the IO may be waiting for */
374 539980 : unsigned waiting_index = (io->writer_index + 1) % io->io_max;
375 :
376 : /* the index that worker just completed */
377 539980 : unsigned done_index = worker->index;
378 :
379 : /* get the new working task */
380 539980 : worker->index = next_index;
381 539980 : task = &worker->task_map[worker->index];
382 :
383 : /* if the just completed task is at this index */
384 539980 : if (done_index == waiting_index) {
385 : /* notify the IO that a new write is complete */
386 6 : thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
387 : } else {
388 539974 : thread_mutex_unlock(&io->io_mutex);
389 : }
390 :
391 : /* return the new task */
392 539980 : return task;
393 : }
394 :
395 : /* check if the worker has to exit */
396 : /* but only if there is no work to do */
397 522316 : if (io->done) {
398 462 : thread_mutex_unlock(&io->io_mutex);
399 462 : return 0;
400 : }
401 :
402 : /* otherwise wait for a write_sched event */
403 521854 : thread_cond_wait(&io->write_sched, &io->io_mutex);
404 : }
405 : }
406 :
407 : /**
408 : * Get the next block position to operate on.
409 : *
410 : * This is the synchronization point for workers with the io.
411 : */
412 143155 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
413 : {
414 : block_off_t blockcur_schedule;
415 : block_off_t blockcur_caller;
416 : unsigned i;
417 :
418 : /* get the next parity position to process */
419 143155 : blockcur_schedule = io_position_next(io);
420 :
421 : /* ensure that all data/parity was read */
422 143155 : assert(io->reader_list[0] == io->reader_max);
423 :
424 : /* setup the list of workers to process */
425 1357028 : for (i = 0; i <= io->reader_max; ++i)
426 1213873 : io->reader_list[i] = i;
427 :
428 : /* the synchronization is protected by the io mutex */
429 143155 : thread_mutex_lock(&io->io_mutex);
430 :
431 : /* schedule the next read */
432 143155 : io_reader_sched(io, io->reader_index, blockcur_schedule);
433 :
434 : /* set the index for the tasks to return to the caller */
435 143155 : io->reader_index = (io->reader_index + 1) % io->io_max;
436 :
437 : /* get the position to operate at high level from one task */
438 143155 : blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
439 :
440 : /* set the buffer to use */
441 143155 : *buffer = io->buffer_map[io->reader_index];
442 :
443 : /* signal all the workers that there is a new pending task */
444 143155 : thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
445 :
446 143155 : return blockcur_caller;
447 : }
448 :
449 107775 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
450 : {
451 : (void)io;
452 : (void)blockcur;
453 : (void)skip;
454 107775 : }
455 :
456 107775 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
457 : {
458 : unsigned i;
459 :
460 : /* ensure that all parity was written */
461 107775 : assert(io->writer_list[0] == io->writer_max);
462 :
463 : /* setup the list of workers to process */
464 755530 : for (i = 0; i <= io->writer_max; ++i)
465 647755 : io->writer_list[i] = i;
466 :
467 : /* the synchronization is protected by the io mutex */
468 107775 : thread_mutex_lock(&io->io_mutex);
469 :
470 : /* report errors */
471 538875 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
472 431100 : writer_error[i] = io->writer_error[i];
473 431100 : io->writer_error[i] = 0;
474 : }
475 :
476 107775 : if (skip) {
477 : /* skip the next write */
478 985 : io_writer_sched_empty(io, io->writer_index, blockcur);
479 : } else {
480 : /* schedule the next write */
481 106790 : io_writer_sched(io, io->writer_index, blockcur);
482 : }
483 :
484 : /* at this point the writers must be in sync with the readers */
485 107775 : assert(io->writer_index == io->reader_index);
486 :
487 : /* set the index to be used for the next write */
488 107775 : io->writer_index = (io->writer_index + 1) % io->io_max;
489 :
490 : /* signal all the workers that there is a new pending task */
491 107775 : thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
492 107775 : }
493 :
494 0 : static void io_refresh_thread(struct snapraid_io* io)
495 : {
496 : unsigned i;
497 :
498 : /* the synchronization is protected by the io mutex */
499 0 : thread_mutex_lock(&io->io_mutex);
500 :
501 : /* for all readers, count the number of read blocks */
502 0 : for (i = 0; i < io->reader_max; ++i) {
503 : unsigned begin, end, cached;
504 0 : struct snapraid_worker* worker = &io->reader_map[i];
505 :
506 : /* the first block read */
507 0 : begin = io->reader_index + 1;
508 : /* the block in reading */
509 0 : end = worker->index;
510 0 : if (begin > end)
511 0 : end += io->io_max;
512 0 : cached = end - begin;
513 :
514 0 : if (worker->parity_handle)
515 0 : io->state->parity[worker->parity_handle->level].cached_blocks = cached;
516 : else
517 0 : worker->handle->disk->cached_blocks = cached;
518 : }
519 :
520 : /* for all writers, count the number of written blocks */
521 : /* note that this is a kind of "opposite" of cached blocks */
522 0 : for (i = 0; i < io->writer_max; ++i) {
523 : unsigned begin, end, cached;
524 0 : struct snapraid_worker* worker = &io->writer_map[i];
525 :
526 : /* the first block written */
527 0 : begin = io->writer_index + 1;
528 : /* the block in writing */
529 0 : end = worker->index;
530 0 : if (begin > end)
531 0 : end += io->io_max;
532 0 : cached = end - begin;
533 :
534 0 : io->state->parity[worker->parity_handle->level].cached_blocks = cached;
535 : }
536 :
537 0 : thread_mutex_unlock(&io->io_mutex);
538 0 : }
539 :
540 148589 : static void io_flush_thread(struct snapraid_io* io)
541 : {
542 : unsigned i;
543 :
544 148506 : while (1) {
545 148589 : int all_done = 1;
546 :
547 : /* the synchronization is protected by the io mutex */
548 148589 : thread_mutex_lock(&io->io_mutex);
549 :
550 195492 : for (i = 0; i < io->writer_max; ++i) {
551 195409 : struct snapraid_worker* worker = &io->writer_map[i];
552 :
553 : /* get the next pending task */
554 195409 : unsigned next_index = (worker->index + 1) % io->io_max;
555 :
556 : /* if the queue of pending tasks is not empty */
557 195409 : if (next_index != io->writer_index) {
558 148506 : all_done = 0;
559 148506 : break;
560 : }
561 : }
562 :
563 148589 : thread_mutex_unlock(&io->io_mutex);
564 :
565 148589 : if (all_done)
566 83 : break;
567 :
568 : /* wait for something to complete */
569 148506 : thread_yield();
570 : }
571 83 : }
572 :
573 1070058 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
574 : {
575 : unsigned waiting_cycle;
576 :
577 : /* count the waiting cycle */
578 1070058 : waiting_cycle = 0;
579 :
580 : /* clear the waiting indexes */
581 1070058 : *waiting_mac = 0;
582 :
583 : /* the synchronization is protected by the io mutex */
584 1070058 : thread_mutex_lock(&io->io_mutex);
585 :
586 815 : while (1) {
587 : unsigned char* let;
588 : unsigned busy_index;
589 :
590 : /* get the index the IO is using */
591 : /* we must ensure that this index has not a read in progress */
592 : /* to avoid a concurrent access */
593 1070873 : busy_index = io->reader_index;
594 :
595 : /* search for a worker that has already finished */
596 1070873 : let = &io->reader_list[0];
597 6501 : while (1) {
598 1077374 : unsigned i = *let;
599 :
600 : /* if we are at the end */
601 1077374 : if (i == io->reader_max)
602 815 : break;
603 :
604 : /* if it's in range */
605 1076559 : if (base <= i && i < base + count) {
606 : struct snapraid_worker* worker;
607 :
608 : /* if it's the first cycle */
609 1073001 : if (waiting_cycle == 0) {
610 : /* store the waiting indexes */
611 1072120 : waiting_map[(*waiting_mac)++] = i - base;
612 : }
613 :
614 1073001 : worker = &io->reader_map[i];
615 :
616 : /* if the worker has finished this index */
617 1073001 : if (busy_index != worker->index) {
618 : struct snapraid_task* task;
619 :
620 1070058 : task = &worker->task_map[io->reader_index];
621 :
622 1070058 : thread_mutex_unlock(&io->io_mutex);
623 :
624 : /* mark the worker as processed */
625 : /* setting the previous one to point at the next one */
626 1070058 : *let = io->reader_list[i + 1];
627 :
628 : /* return the position */
629 1070058 : *pos = i - base;
630 :
631 : /* on the first cycle, no one is waiting */
632 1070058 : if (waiting_cycle == 0)
633 1069246 : *waiting_mac = 0;
634 :
635 1070058 : return task;
636 : }
637 : }
638 :
639 : /* next position to check */
640 6501 : let = &io->reader_list[i + 1];
641 : }
642 :
643 : /* if no worker is ready, wait for an event */
644 815 : thread_cond_wait(&io->read_done, &io->io_mutex);
645 :
646 : /* count the cycles */
647 815 : ++waiting_cycle;
648 : }
649 : }
650 :
651 858354 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
652 : {
653 858354 : return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
654 : }
655 :
656 211704 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
657 : {
658 211704 : return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
659 : }
660 :
661 539980 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
662 : {
663 : unsigned waiting_cycle;
664 :
665 : /* count the waiting cycle */
666 539980 : waiting_cycle = 0;
667 :
668 : /* clear the waiting indexes */
669 539980 : *waiting_mac = 0;
670 :
671 : /* the synchronization is protected by the io mutex */
672 539980 : thread_mutex_lock(&io->io_mutex);
673 :
674 4 : while (1) {
675 : unsigned char* let;
676 : unsigned busy_index;
677 :
678 : /* get the next index the IO is going to use */
679 : /* we must ensure that this index has not a write in progress */
680 : /* to avoid a concurrent access */
681 : /* note that we are already sure that a write is not in progress */
682 : /* at the index the IO is using at now */
683 539984 : busy_index = (io->writer_index + 1) % io->io_max;
684 :
685 : /* search for a worker that has already finished */
686 539984 : let = &io->writer_list[0];
687 4 : while (1) {
688 539988 : unsigned i = *let;
689 : struct snapraid_worker* worker;
690 :
691 : /* if we are at the end */
692 539988 : if (i == io->writer_max)
693 4 : break;
694 :
695 : /* if it's the first cycle */
696 539984 : if (waiting_cycle == 0) {
697 : /* store the waiting indexes */
698 539980 : waiting_map[(*waiting_mac)++] = i;
699 : }
700 :
701 539984 : worker = &io->writer_map[i];
702 :
703 : /* the two indexes cannot be equal */
704 539984 : assert(io->writer_index != worker->index);
705 :
706 : /* if the worker has finished this index */
707 539984 : if (busy_index != worker->index) {
708 539980 : thread_mutex_unlock(&io->io_mutex);
709 :
710 : /* mark the worker as processed */
711 : /* setting the previous one to point at the next one */
712 539980 : *let = io->writer_list[i + 1];
713 :
714 : /* return the position */
715 539980 : *pos = i;
716 :
717 : /* on the first cycle, no one is waiting */
718 539980 : if (waiting_cycle == 0)
719 539976 : *waiting_mac = 0;
720 :
721 539980 : return;
722 : }
723 :
724 : /* next position to check */
725 4 : let = &io->writer_list[i + 1];
726 : }
727 :
728 : /* if no worker is ready, wait for an event */
729 4 : thread_cond_wait(&io->write_done, &io->io_mutex);
730 :
731 : /* count the cycles */
732 4 : ++waiting_cycle;
733 : }
734 : }
735 :
736 1070196 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
737 : {
738 : /* if we reached the end */
739 1070196 : if (task->position >= worker->io->block_max) {
740 : /* complete a dummy task */
741 138 : task->state = TASK_STATE_EMPTY;
742 : } else {
743 1070058 : worker->func(worker, task);
744 : }
745 1070196 : }
746 :
747 660 : static void* io_reader_thread(void* arg)
748 : {
749 660 : struct snapraid_worker* worker = arg;
750 :
751 : /* force completion of the first task */
752 660 : io_reader_worker(worker, &worker->task_map[0]);
753 :
754 1152634 : while (1) {
755 : struct snapraid_task* task;
756 :
757 : /* get the new task */
758 1153294 : task = io_reader_step(worker);
759 :
760 : /* if no task, it means to exit */
761 1153294 : if (!task)
762 660 : break;
763 :
764 : /* nothing more to do */
765 1152634 : if (task->state == TASK_STATE_EMPTY)
766 83098 : continue;
767 :
768 1069536 : assert(task->state == TASK_STATE_READY);
769 :
770 : /* work on the assigned task */
771 1069536 : io_reader_worker(worker, task);
772 : }
773 :
774 660 : return 0;
775 : }
776 :
777 462 : static void* io_writer_thread(void* arg)
778 : {
779 462 : struct snapraid_worker* worker = arg;
780 462 : int latest_state = TASK_STATE_DONE;
781 :
782 539980 : while (1) {
783 : struct snapraid_task* task;
784 :
785 : /* get the new task */
786 540442 : task = io_writer_step(worker, latest_state);
787 :
788 : /* if no task, it means to exit */
789 540442 : if (!task)
790 462 : break;
791 :
792 : /* nothing more to do */
793 539980 : if (task->state == TASK_STATE_EMPTY) {
794 5910 : latest_state = TASK_STATE_DONE;
795 5910 : continue;
796 : }
797 :
798 534070 : assert(task->state == TASK_STATE_READY);
799 :
800 : /* work on the assigned task */
801 534070 : worker->func(worker, task);
802 :
803 : /* save the resulting state */
804 534070 : latest_state = task->state;
805 : }
806 :
807 462 : return 0;
808 : }
809 :
810 96 : static void io_start_thread(struct snapraid_io* io,
811 : block_off_t blockstart, block_off_t blockmax,
812 : bit_vect_t* block_enabled)
813 : {
814 : unsigned i;
815 : tommy_node* j;
816 :
817 : /* enable the filesystem mutex in all disks */
818 670 : for (j = io->state->disklist; j != 0; j = j->next) {
819 574 : struct snapraid_disk* disk = j->data;
820 574 : disk_start_thread(disk);
821 : }
822 :
823 96 : io->block_start = blockstart;
824 96 : io->block_max = blockmax;
825 96 : io->block_enabled = block_enabled;
826 96 : io->block_next = blockstart;
827 :
828 96 : io->done = 0;
829 96 : io->reader_index = io->io_max - 1; /* the first io_read_next() is going to set it to 0 */
830 96 : io->writer_index = 0;
831 :
832 : /* clear writer errors */
833 480 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
834 384 : io->writer_error[i] = 0;
835 :
836 : /* setup the initial read pending tasks, except the latest one, */
837 : /* the latest will be initialized at the fist io_read_next() call */
838 12288 : for (i = 0; i < io->io_max - 1; ++i) {
839 12192 : block_off_t blockcur = io_position_next(io);
840 :
841 12192 : io_reader_sched(io, i, blockcur);
842 : }
843 :
844 : /* setup the lists of workers to process */
845 96 : io->reader_list[0] = io->reader_max;
846 654 : for (i = 0; i <= io->writer_max; ++i)
847 558 : io->writer_list[i] = i;
848 :
849 : /* start the reader threads */
850 756 : for (i = 0; i < io->reader_max; ++i) {
851 660 : struct snapraid_worker* worker = &io->reader_map[i];
852 :
853 660 : worker->index = 0;
854 :
855 660 : thread_create(&worker->thread, io_reader_thread, worker);
856 : }
857 :
858 : /* start the writer threads */
859 558 : for (i = 0; i < io->writer_max; ++i) {
860 462 : struct snapraid_worker* worker = &io->writer_map[i];
861 :
862 462 : worker->index = io->io_max - 1;
863 :
864 462 : thread_create(&worker->thread, io_writer_thread, worker);
865 : }
866 96 : }
867 :
868 96 : static void io_stop_thread(struct snapraid_io* io)
869 : {
870 : unsigned i;
871 :
872 96 : thread_mutex_lock(&io->io_mutex);
873 :
874 : /* mark that we are stopping */
875 96 : io->done = 1;
876 :
877 : /* signal all the threads to recognize the new state */
878 96 : thread_cond_broadcast(&io->read_sched);
879 96 : thread_cond_broadcast(&io->write_sched);
880 :
881 96 : thread_mutex_unlock(&io->io_mutex);
882 :
883 : /* wait for all readers to terminate */
884 756 : for (i = 0; i < io->reader_max; ++i) {
885 660 : struct snapraid_worker* worker = &io->reader_map[i];
886 : void* retval;
887 :
888 : /* wait for thread termination */
889 660 : thread_join(worker->thread, &retval);
890 : }
891 :
892 : /* wait for all writers to terminate */
893 558 : for (i = 0; i < io->writer_max; ++i) {
894 462 : struct snapraid_worker* worker = &io->writer_map[i];
895 : void* retval;
896 :
897 : /* wait for thread termination */
898 462 : thread_join(worker->thread, &retval);
899 : }
900 96 : }
901 :
902 : #endif
903 :
904 : /*****************************************************************************/
905 : /* global */
906 :
907 97 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
908 : unsigned io_cache, unsigned buffer_max,
909 : void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
910 : struct snapraid_handle* handle_map, unsigned handle_max,
911 : void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
912 : void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
913 : struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
914 : {
915 : unsigned i;
916 : size_t allocated_size;
917 97 : size_t block_size = state->block_size;
918 :
919 97 : io->state = state;
920 :
921 97 : assert(buffer_max >= handle_max + parity_handle_max);
922 :
923 : /* initialize bandwidth limiting */
924 97 : bw_init(&io->bw, state->opt.bwlimit);
925 :
926 : /* set IO context in handles */
927 679 : for (i = 0; i < handle_max; ++i)
928 582 : handle_map[i].bw = &io->bw;
929 644 : for (i = 0; i < parity_handle_max; ++i)
930 547 : parity_handle_map[i].bw = &io->bw;
931 :
932 : #if HAVE_THREAD
933 97 : if (io_cache == 0) {
934 : /* default is 16 MiB of cache */
935 : /* this seems to be a good tradeoff between speed and memory usage */
936 96 : io->io_max = 16 * 1024 * 1024 / state->block_size;
937 96 : if (io->io_max < IO_MIN)
938 0 : io->io_max = IO_MIN;
939 96 : if (io->io_max > IO_MAX)
940 96 : io->io_max = IO_MAX;
941 : } else {
942 1 : io->io_max = io_cache;
943 : }
944 : #else
945 : (void)io_cache;
946 :
947 : /* without pthread force the mono thread mode */
948 : io->io_max = 1;
949 : #endif
950 :
951 97 : assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
952 :
953 97 : io->buffer_max = buffer_max;
954 97 : allocated_size = 0;
955 12386 : for (i = 0; i < io->io_max; ++i) {
956 12289 : if (state->file_mode != ADVISE_DIRECT)
957 12289 : io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
958 : else
959 0 : io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
960 12289 : if (!state->opt.skip_self)
961 0 : mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
962 12289 : allocated_size += block_size * buffer_max;
963 : }
964 :
965 97 : msg_progress("Using %u MiB of memory for %u cached blocks.\n", (unsigned)(allocated_size / MEBI), io->io_max);
966 :
967 97 : if (parity_writer) {
968 83 : io->reader_max = handle_max;
969 83 : io->writer_max = parity_handle_max;
970 : } else {
971 14 : io->reader_max = handle_max + parity_handle_max;
972 14 : io->writer_max = 0;
973 : }
974 :
975 97 : io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
976 97 : io->reader_list = malloc_nofail(io->reader_max + 1);
977 97 : io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
978 97 : io->writer_list = malloc_nofail(io->writer_max + 1);
979 :
980 97 : io->data_base = 0;
981 97 : io->data_count = handle_max;
982 97 : io->parity_base = handle_max;
983 97 : io->parity_count = parity_handle_max;
984 :
985 763 : for (i = 0; i < io->reader_max; ++i) {
986 666 : struct snapraid_worker* worker = &io->reader_map[i];
987 :
988 666 : worker->io = io;
989 :
990 666 : if (i < handle_max) {
991 : /* it's a data read */
992 582 : worker->handle = &handle_map[i];
993 582 : worker->parity_handle = 0;
994 582 : worker->func = data_reader;
995 :
996 : /* data read is put in lower buffer index */
997 582 : worker->buffer_skew = 0;
998 : } else {
999 : /* it's a parity read */
1000 84 : worker->handle = 0;
1001 84 : worker->parity_handle = &parity_handle_map[i - handle_max];
1002 84 : worker->func = parity_reader;
1003 :
1004 : /* parity read is put after data and computed parity */
1005 84 : worker->buffer_skew = parity_handle_max;
1006 : }
1007 : }
1008 :
1009 560 : for (i = 0; i < io->writer_max; ++i) {
1010 463 : struct snapraid_worker* worker = &io->writer_map[i];
1011 :
1012 463 : worker->io = io;
1013 :
1014 : /* it's a parity write */
1015 463 : worker->handle = 0;
1016 463 : worker->parity_handle = &parity_handle_map[i];
1017 463 : worker->func = parity_writer;
1018 :
1019 : /* parity to write is put after data */
1020 463 : worker->buffer_skew = handle_max;
1021 : }
1022 :
1023 : #if HAVE_THREAD
1024 97 : if (io->io_max > 1) {
1025 96 : io_read_next = io_read_next_thread;
1026 96 : io_write_preset = io_write_preset_thread;
1027 96 : io_write_next = io_write_next_thread;
1028 96 : io_refresh = io_refresh_thread;
1029 96 : io_flush = io_flush_thread;
1030 96 : io_data_read = io_data_read_thread;
1031 96 : io_parity_read = io_parity_read_thread;
1032 96 : io_parity_write = io_parity_write_thread;
1033 96 : io_start = io_start_thread;
1034 96 : io_stop = io_stop_thread;
1035 :
1036 96 : thread_mutex_init(&io->io_mutex);
1037 96 : thread_cond_init(&io->read_done);
1038 96 : thread_cond_init(&io->read_sched);
1039 96 : thread_cond_init(&io->write_done);
1040 96 : thread_cond_init(&io->write_sched);
1041 : } else
1042 : #endif
1043 : {
1044 1 : io_read_next = io_read_next_mono;
1045 1 : io_write_preset = io_write_preset_mono;
1046 1 : io_write_next = io_write_next_mono;
1047 1 : io_refresh = io_refresh_mono;
1048 1 : io_flush = io_flush_mono;
1049 1 : io_data_read = io_data_read_mono;
1050 1 : io_parity_read = io_parity_read_mono;
1051 1 : io_parity_write = io_parity_write_mono;
1052 1 : io_start = io_start_mono;
1053 1 : io_stop = io_stop_mono;
1054 : }
1055 97 : }
1056 :
1057 97 : void io_done(struct snapraid_io* io)
1058 : {
1059 : unsigned i;
1060 :
1061 12386 : for (i = 0; i < io->io_max; ++i) {
1062 12289 : free(io->buffer_map[i]);
1063 12289 : free(io->buffer_alloc_map[i]);
1064 : }
1065 :
1066 97 : free(io->reader_map);
1067 97 : free(io->reader_list);
1068 97 : free(io->writer_map);
1069 97 : free(io->writer_list);
1070 :
1071 : #if HAVE_THREAD
1072 97 : if (io->io_max > 1) {
1073 96 : thread_mutex_destroy(&io->io_mutex);
1074 96 : thread_cond_destroy(&io->read_done);
1075 96 : thread_cond_destroy(&io->read_sched);
1076 96 : thread_cond_destroy(&io->write_done);
1077 96 : thread_cond_destroy(&io->write_sched);
1078 : }
1079 : #endif
1080 97 : }
1081 :
|