Line data Source code
1 : /*
2 : * Copyright (C) 2016 Andrea Mazzoleni
3 : *
4 : * This program is free software: you can redistribute it and/or modify
5 : * it under the terms of the GNU General Public License as published by
6 : * the Free Software Foundation, either version 3 of the License, or
7 : * (at your option) any later version.
8 : *
9 : * This program is distributed in the hope that it will be useful,
10 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 : * GNU General Public License for more details.
13 : *
14 : * You should have received a copy of the GNU General Public License
15 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 : */
17 :
18 : #include "portable.h"
19 :
20 : #include "io.h"
21 :
22 : /**
23 : * Get the next block position to process.
24 : */
25 162051 : static block_off_t io_position_next(struct snapraid_io* io)
26 : {
27 : block_off_t blockcur;
28 :
29 : /* get the next position */
30 647484 : while (io->block_next < io->block_max && !io->block_is_enabled(io->block_arg, io->block_next))
31 323382 : ++io->block_next;
32 :
33 162051 : blockcur = io->block_next;
34 :
35 : /* next block for the next call */
36 162051 : ++io->block_next;
37 :
38 162051 : return blockcur;
39 : }
40 :
41 : /**
42 : * Setup the next pending task for all readers.
43 : */
44 162051 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
45 : {
46 : unsigned i;
47 :
48 1355145 : for (i = 0; i < io->reader_max; ++i) {
49 1193094 : struct snapraid_worker* worker = &io->reader_map[i];
50 1193094 : struct snapraid_task* task = &worker->task_map[task_index];
51 :
52 : /* setup the new pending task */
53 1193094 : if (blockcur < io->block_max)
54 1110144 : task->state = TASK_STATE_READY;
55 : else
56 82950 : task->state = TASK_STATE_EMPTY;
57 :
58 1193094 : task->path[0] = 0;
59 1193094 : if (worker->handle)
60 972306 : task->disk = worker->handle->disk;
61 : else
62 220788 : task->disk = 0;
63 1193094 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
64 1193094 : task->position = blockcur;
65 1193094 : task->block = 0;
66 1193094 : task->file = 0;
67 1193094 : task->file_pos = 0;
68 1193094 : task->read_size = 0;
69 1193094 : task->is_timestamp_different = 0;
70 : }
71 162051 : }
72 :
73 : /**
74 : * Setup the next pending task for all writers.
75 : */
76 112360 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
77 : {
78 : unsigned i;
79 :
80 656415 : for (i = 0; i < io->writer_max; ++i) {
81 544055 : struct snapraid_worker* worker = &io->writer_map[i];
82 544055 : struct snapraid_task* task = &worker->task_map[task_index];
83 :
84 : /* setup the new pending task */
85 544055 : task->state = TASK_STATE_READY;
86 544055 : task->path[0] = 0;
87 544055 : task->disk = 0;
88 544055 : task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
89 544055 : task->position = blockcur;
90 544055 : task->block = 0;
91 544055 : task->file = 0;
92 544055 : task->file_pos = 0;
93 544055 : task->read_size = 0;
94 544055 : task->is_timestamp_different = 0;
95 : }
96 112360 : }
97 :
98 : /**
99 : * Setup an empty next pending task for all writers.
100 : */
101 2396 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
102 : {
103 : unsigned i;
104 :
105 16772 : for (i = 0; i < io->writer_max; ++i) {
106 14376 : struct snapraid_worker* worker = &io->writer_map[i];
107 14376 : struct snapraid_task* task = &worker->task_map[task_index];
108 :
109 : /* setup the new pending task */
110 14376 : task->state = TASK_STATE_EMPTY;
111 14376 : task->path[0] = 0;
112 14376 : task->disk = 0;
113 14376 : task->buffer = 0;
114 14376 : task->position = blockcur;
115 14376 : task->block = 0;
116 14376 : task->file = 0;
117 14376 : task->file_pos = 0;
118 14376 : task->read_size = 0;
119 14376 : task->is_timestamp_different = 0;
120 : }
121 2396 : }
122 :
123 : /*****************************************************************************/
124 : /* mono thread */
125 :
126 4688 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
127 : {
128 : block_off_t blockcur_schedule;
129 :
130 : /* reset the index */
131 4688 : io->reader_index = 0;
132 :
133 4688 : blockcur_schedule = io_position_next(io);
134 :
135 : /* schedule the next read */
136 4688 : io_reader_sched(io, 0, blockcur_schedule);
137 :
138 : /* set the buffer to use */
139 4688 : *buffer = io->buffer_map[0];
140 :
141 4688 : return blockcur_schedule;
142 : }
143 :
144 4687 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
145 : {
146 : unsigned i;
147 :
148 : /* reset the index */
149 4687 : io->writer_index = 0;
150 :
151 : /* clear errors */
152 23435 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
153 18748 : io->writer_error[i] = 0;
154 :
155 4687 : if (skip) {
156 : /* skip the next write */
157 0 : io_writer_sched_empty(io, 0, blockcur);
158 : } else {
159 : /* schedule the next write */
160 4687 : io_writer_sched(io, 0, blockcur);
161 : }
162 4687 : }
163 :
164 4687 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
165 : {
166 : unsigned i;
167 :
168 : (void)blockcur;
169 : (void)skip;
170 :
171 : /* report errors */
172 23435 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
173 18748 : writer_error[i] = io->writer_error[i];
174 4687 : }
175 :
176 0 : static void io_refresh_mono(struct snapraid_io* io)
177 : {
178 : (void)io;
179 0 : }
180 :
181 28122 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
182 : {
183 : struct snapraid_worker* worker;
184 : struct snapraid_task* task;
185 : unsigned i;
186 :
187 : /* get the next task */
188 28122 : i = io->reader_index++;
189 :
190 28122 : assert(base <= i && i < base + count);
191 :
192 28122 : worker = &io->reader_map[i];
193 28122 : task = &worker->task_map[0];
194 :
195 : /* do the work */
196 28122 : if (task->state != TASK_STATE_EMPTY)
197 28122 : worker->func(worker, task);
198 :
199 : /* return the position */
200 28122 : *pos = i - base;
201 :
202 : /* store the waiting index */
203 28122 : waiting_map[0] = i - base;
204 28122 : *waiting_mac = 1;
205 :
206 28122 : return task;
207 : }
208 :
209 28122 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
210 : {
211 28122 : return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
212 : }
213 :
214 0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
215 : {
216 0 : return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
217 : }
218 :
219 4687 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
220 : {
221 : struct snapraid_worker* worker;
222 : struct snapraid_task* task;
223 : unsigned i;
224 :
225 : /* get the next task */
226 4687 : i = io->writer_index++;
227 :
228 4687 : worker = &io->writer_map[i];
229 4687 : task = &worker->task_map[0];
230 :
231 4687 : io->writer_error[i] = 0;
232 :
233 : /* do the work */
234 4687 : if (task->state != TASK_STATE_EMPTY)
235 4687 : worker->func(worker, task);
236 :
237 : /* return the position */
238 4687 : *pos = i;
239 :
240 : /* store the waiting index */
241 4687 : waiting_map[0] = i;
242 4687 : *waiting_mac = 1;
243 4687 : }
244 :
245 1 : static void io_start_mono(struct snapraid_io* io,
246 : block_off_t blockstart, block_off_t blockmax,
247 : int (*block_is_enabled)(void* arg, block_off_t), void* blockarg)
248 : {
249 1 : io->block_start = blockstart;
250 1 : io->block_max = blockmax;
251 1 : io->block_is_enabled = block_is_enabled;
252 1 : io->block_arg = blockarg;
253 1 : io->block_next = blockstart;
254 1 : }
255 :
256 1 : static void io_stop_mono(struct snapraid_io* io)
257 : {
258 : (void)io;
259 1 : }
260 :
261 : /*****************************************************************************/
262 : /* multi thread */
263 :
264 : /* disable multithread if pthread is not present */
265 : #if HAVE_PTHREAD
266 :
267 : /**
268 : * Get the next task to work on for a reader.
269 : *
270 : * This is the synchronization point for workers with the io.
271 : */
272 1155859 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
273 : {
274 1155859 : struct snapraid_io* io = worker->io;
275 :
276 : /* the synchronization is protected by the io mutex */
277 1155859 : thread_mutex_lock(&io->io_mutex);
278 :
279 : while (1) {
280 : unsigned next_index;
281 :
282 : /* check if the worker has to exit */
283 : /* even if there is work to do */
284 2044739 : if (io->done) {
285 648 : thread_mutex_unlock(&io->io_mutex);
286 648 : return 0;
287 : }
288 :
289 : /* get the next pending task */
290 2044091 : next_index = (worker->index + 1) % io->io_max;
291 :
292 : /* if the queue of pending tasks is not empty */
293 2044091 : if (next_index != io->reader_index) {
294 : struct snapraid_task* task;
295 :
296 : /* the index that the IO may be waiting for */
297 1163889 : unsigned waiting_index = io->reader_index;
298 :
299 : /* the index that worker just completed */
300 1163889 : unsigned done_index = worker->index;
301 :
302 : /* get the new working task */
303 1163889 : worker->index = next_index;
304 1163889 : task = &worker->task_map[worker->index];
305 :
306 : /* if the just completed task is at this index */
307 1163889 : if (done_index == waiting_index) {
308 : /* notify the IO that a new read is complete */
309 2053 : thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
310 : } else {
311 1161836 : thread_mutex_unlock(&io->io_mutex);
312 : }
313 :
314 : /* return the new task */
315 1163286 : return task;
316 : }
317 :
318 : /* otherwise wait for a read_sched event */
319 880202 : thread_cond_wait(&io->read_sched, &io->io_mutex);
320 880202 : }
321 : }
322 :
323 : /**
324 : * Get the next task to work on for a writer.
325 : *
326 : * This is the synchronization point for workers with the io.
327 : */
328 550861 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
329 : {
330 550861 : struct snapraid_io* io = worker->io;
331 : int error_index;
332 :
333 : /* the synchronization is protected by the io mutex */
334 550861 : thread_mutex_lock(&io->io_mutex);
335 :
336 : /* counts the number of errors in the global state */
337 554206 : error_index = state - IO_WRITER_ERROR_BASE;
338 554206 : if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
339 0 : ++io->writer_error[error_index];
340 :
341 : while (1) {
342 : unsigned next_index;
343 :
344 : /* get the next pending task */
345 980523 : next_index = (worker->index + 1) % io->io_max;
346 :
347 : /* if the queue of pending tasks is not empty */
348 980523 : if (next_index != io->writer_index) {
349 : struct snapraid_task* task;
350 :
351 : /* the index that the IO may be waiting for */
352 553744 : unsigned waiting_index = (io->writer_index + 1) % io->io_max;
353 :
354 : /* the index that worker just completed */
355 553744 : unsigned done_index = worker->index;
356 :
357 : /* get the new working task */
358 553744 : worker->index = next_index;
359 553744 : task = &worker->task_map[worker->index];
360 :
361 : /* if the just completed task is at this index */
362 553744 : if (done_index == waiting_index) {
363 : /* notify the IO that a new write is complete */
364 16 : thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
365 : } else {
366 553728 : thread_mutex_unlock(&io->io_mutex);
367 : }
368 :
369 : /* return the new task */
370 552907 : return task;
371 : }
372 :
373 : /* check if the worker has to exit */
374 : /* but only if there is no work to do */
375 426779 : if (io->done) {
376 462 : thread_mutex_unlock(&io->io_mutex);
377 462 : return 0;
378 : }
379 :
380 : /* otherwise wait for a write_sched event */
381 426317 : thread_cond_wait(&io->write_sched, &io->io_mutex);
382 426317 : }
383 : }
384 :
385 : /**
386 : * Get the next block position to operate on.
387 : *
388 : * This is the synchronization point for workers with the io.
389 : */
390 145298 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
391 : {
392 : block_off_t blockcur_schedule;
393 : block_off_t blockcur_caller;
394 : unsigned i;
395 :
396 : /* get the next parity position to process */
397 145298 : blockcur_schedule = io_position_next(io);
398 :
399 : /* ensure that all data/parity was read */
400 145298 : assert(io->reader_list[0] == io->reader_max);
401 :
402 : /* setup the list of workers to process */
403 1373266 : for (i = 0; i <= io->reader_max; ++i)
404 1227968 : io->reader_list[i] = i;
405 :
406 : /* the synchronization is protected by the io mutex */
407 145298 : thread_mutex_lock(&io->io_mutex);
408 :
409 : /* schedule the next read */
410 145298 : io_reader_sched(io, io->reader_index, blockcur_schedule);
411 :
412 : /* set the index for the tasks to return to the caller */
413 145298 : io->reader_index = (io->reader_index + 1) % io->io_max;
414 :
415 : /* get the position to operate at high level from one task */
416 145298 : blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
417 :
418 : /* set the buffer to use */
419 145298 : *buffer = io->buffer_map[io->reader_index];
420 :
421 : /* signal all the workers that there is a new pending task */
422 145298 : thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
423 :
424 145298 : return blockcur_caller;
425 : }
426 :
427 110069 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
428 : {
429 : (void)io;
430 : (void)blockcur;
431 : (void)skip;
432 110069 : }
433 :
434 110069 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
435 : {
436 : unsigned i;
437 :
438 : /* ensure that all parity was written */
439 110069 : assert(io->writer_list[0] == io->writer_max);
440 :
441 : /* setup the list of workers to process */
442 773882 : for (i = 0; i <= io->writer_max; ++i)
443 663813 : io->writer_list[i] = i;
444 :
445 : /* the synchronization is protected by the io mutex */
446 110069 : thread_mutex_lock(&io->io_mutex);
447 :
448 : /* report errors */
449 550345 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
450 440276 : writer_error[i] = io->writer_error[i];
451 440276 : io->writer_error[i] = 0;
452 : }
453 :
454 110069 : if (skip) {
455 : /* skip the next write */
456 2396 : io_writer_sched_empty(io, io->writer_index, blockcur);
457 : } else {
458 : /* schedule the next write */
459 107673 : io_writer_sched(io, io->writer_index, blockcur);
460 : }
461 :
462 : /* at this point the writers must be in sync with the readers */
463 110069 : assert(io->writer_index == io->reader_index);
464 :
465 : /* set the index to be used for the next write */
466 110069 : io->writer_index = (io->writer_index + 1) % io->io_max;
467 :
468 : /* signal all the workers that there is a new pending task */
469 110069 : thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
470 110069 : }
471 :
472 1 : static void io_refresh_thread(struct snapraid_io* io)
473 : {
474 : unsigned i;
475 :
476 : /* the synchronization is protected by the io mutex */
477 1 : thread_mutex_lock(&io->io_mutex);
478 :
479 : /* for all readers, count the number of read blocks */
480 7 : for (i = 0; i < io->reader_max; ++i) {
481 : unsigned begin, end, cached;
482 6 : struct snapraid_worker* worker = &io->reader_map[i];
483 :
484 : /* the first block read */
485 6 : begin = io->reader_index + 1;
486 : /* the block in reading */
487 6 : end = worker->index;
488 6 : if (begin > end)
489 6 : end += io->io_max;
490 6 : cached = end - begin;
491 :
492 6 : if (worker->parity_handle)
493 0 : io->state->parity[worker->parity_handle->level].cached = cached;
494 : else
495 6 : worker->handle->disk->cached = cached;
496 : }
497 :
498 : /* for all writers, count the number of written blocks */
499 : /* note that this is a kind of "opposite" of cached blocks */
500 2 : for (i = 0; i < io->writer_max; ++i) {
501 : unsigned begin, end, cached;
502 1 : struct snapraid_worker* worker = &io->writer_map[i];
503 :
504 : /* the first block written */
505 1 : begin = io->writer_index + 1;
506 : /* the block in writing */
507 1 : end = worker->index;
508 1 : if (begin > end)
509 1 : end += io->io_max;
510 1 : cached = end - begin;
511 :
512 1 : io->state->parity[worker->parity_handle->level].cached = cached;
513 : }
514 :
515 1 : thread_mutex_unlock(&io->io_mutex);
516 1 : }
517 :
518 1082022 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
519 : {
520 : unsigned waiting_cycle;
521 :
522 : /* count the waiting cycle */
523 1082022 : waiting_cycle = 0;
524 :
525 : /* clear the waiting indexes */
526 1082022 : *waiting_mac = 0;
527 :
528 : /* the synchronization is protected by the io mutex */
529 1082022 : thread_mutex_lock(&io->io_mutex);
530 :
531 : while (1) {
532 : unsigned char* let;
533 : unsigned busy_index;
534 :
535 : /* get the index the IO is using */
536 : /* we must ensure that this index has not a read in progress */
537 : /* to avoid a concurrent access */
538 1083866 : busy_index = io->reader_index;
539 :
540 : /* search for a worker that has already finished */
541 1083866 : let = &io->reader_list[0];
542 : while (1) {
543 1091735 : unsigned i = *let;
544 :
545 : /* if we are at the end */
546 1091735 : if (i == io->reader_max)
547 1844 : break;
548 :
549 : /* if it's in range */
550 1089891 : if (base <= i && i < base + count) {
551 : struct snapraid_worker* worker;
552 :
553 : /* if it's the first cycle */
554 1088511 : if (waiting_cycle == 0) {
555 : /* store the waiting indexes */
556 1086614 : waiting_map[(*waiting_mac)++] = i - base;
557 : }
558 :
559 1088511 : worker = &io->reader_map[i];
560 :
561 : /* if the worker has finished this index */
562 1088511 : if (busy_index != worker->index) {
563 : struct snapraid_task* task;
564 :
565 1082022 : task = &worker->task_map[io->reader_index];
566 :
567 1082022 : thread_mutex_unlock(&io->io_mutex);
568 :
569 : /* mark the worker as processed */
570 : /* setting the previous one to point at the next one */
571 1082022 : *let = io->reader_list[i + 1];
572 :
573 : /* return the position */
574 1082022 : *pos = i - base;
575 :
576 : /* on the first cycle, no one is waiting */
577 1082022 : if (waiting_cycle == 0)
578 1080193 : *waiting_mac = 0;
579 :
580 2164044 : return task;
581 : }
582 : }
583 :
584 : /* next position to check */
585 7869 : let = &io->reader_list[i + 1];
586 7869 : }
587 :
588 : /* if no worker is ready, wait for an event */
589 1844 : thread_cond_wait(&io->read_done, &io->io_mutex);
590 :
591 : /* count the cycles */
592 1844 : ++waiting_cycle;
593 1844 : }
594 : }
595 :
596 871218 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
597 : {
598 871218 : return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
599 : }
600 :
601 210804 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
602 : {
603 210804 : return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
604 : }
605 :
606 553744 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
607 : {
608 : unsigned waiting_cycle;
609 :
610 : /* count the waiting cycle */
611 553744 : waiting_cycle = 0;
612 :
613 : /* clear the waiting indexes */
614 553744 : *waiting_mac = 0;
615 :
616 : /* the synchronization is protected by the io mutex */
617 553744 : thread_mutex_lock(&io->io_mutex);
618 :
619 : while (1) {
620 : unsigned char* let;
621 : unsigned busy_index;
622 :
623 : /* get the next index the IO is going to use */
624 : /* we must ensure that this index has not a write in progress */
625 : /* to avoid a concurrent access */
626 : /* note that we are already sure that a write is not in progress */
627 : /* at the index the IO is using at now */
628 553754 : busy_index = (io->writer_index + 1) % io->io_max;
629 :
630 : /* search for a worker that has already finished */
631 553754 : let = &io->writer_list[0];
632 : while (1) {
633 553785 : unsigned i = *let;
634 : struct snapraid_worker* worker;
635 :
636 : /* if we are at the end */
637 553785 : if (i == io->writer_max)
638 10 : break;
639 :
640 : /* if it's the first cycle */
641 553775 : if (waiting_cycle == 0) {
642 : /* store the waiting indexes */
643 553764 : waiting_map[(*waiting_mac)++] = i;
644 : }
645 :
646 553775 : worker = &io->writer_map[i];
647 :
648 : /* the two indexes cannot be equal */
649 553775 : assert(io->writer_index != worker->index);
650 :
651 : /* if the worker has finished this index */
652 553775 : if (busy_index != worker->index) {
653 553744 : thread_mutex_unlock(&io->io_mutex);
654 :
655 : /* mark the worker as processed */
656 : /* setting the previous one to point at the next one */
657 553744 : *let = io->writer_list[i + 1];
658 :
659 : /* return the position */
660 553744 : *pos = i;
661 :
662 : /* on the first cycle, no one is waiting */
663 553744 : if (waiting_cycle == 0)
664 553734 : *waiting_mac = 0;
665 :
666 1107488 : return;
667 : }
668 :
669 : /* next position to check */
670 31 : let = &io->writer_list[i + 1];
671 31 : }
672 :
673 : /* if no worker is ready, wait for an event */
674 10 : thread_cond_wait(&io->write_done, &io->io_mutex);
675 :
676 : /* count the cycles */
677 10 : ++waiting_cycle;
678 10 : }
679 : }
680 :
681 1081517 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
682 : {
683 : /* if we reached the end */
684 1081517 : if (task->position >= worker->io->block_max) {
685 : /* complete a dummy task */
686 118 : task->state = TASK_STATE_EMPTY;
687 : } else {
688 1081399 : worker->func(worker, task);
689 : }
690 1070843 : }
691 :
692 639 : static void* io_reader_thread(void* arg)
693 : {
694 639 : struct snapraid_worker* worker = arg;
695 :
696 : /* force completion of the first task */
697 639 : io_reader_worker(worker, &worker->task_map[0]);
698 :
699 : while (1) {
700 : struct snapraid_task* task;
701 :
702 : /* get the new task */
703 1159719 : task = io_reader_step(worker);
704 :
705 : /* if no task, it means to exit */
706 1163823 : if (!task)
707 645 : break;
708 :
709 : /* nothing more to do */
710 1163178 : if (task->state == TASK_STATE_EMPTY)
711 82346 : continue;
712 :
713 1080832 : assert(task->state == TASK_STATE_READY);
714 :
715 : /* work on the assigned task */
716 1080832 : io_reader_worker(worker, task);
717 1159083 : }
718 :
719 645 : return 0;
720 : }
721 :
722 462 : static void* io_writer_thread(void* arg)
723 : {
724 462 : struct snapraid_worker* worker = arg;
725 462 : int latest_state = TASK_STATE_DONE;
726 :
727 : while (1) {
728 : struct snapraid_task* task;
729 :
730 : /* get the new task */
731 551134 : task = io_writer_step(worker, latest_state);
732 :
733 : /* if no task, it means to exit */
734 554046 : if (!task)
735 461 : break;
736 :
737 : /* nothing more to do */
738 553585 : if (task->state == TASK_STATE_EMPTY) {
739 14361 : latest_state = TASK_STATE_DONE;
740 14361 : continue;
741 : }
742 :
743 539224 : assert(task->state == TASK_STATE_READY);
744 :
745 : /* work on the assigned task */
746 539224 : worker->func(worker, task);
747 :
748 : /* save the resulting state */
749 536311 : latest_state = task->state;
750 550672 : }
751 :
752 461 : return 0;
753 : }
754 :
755 95 : static void io_start_thread(struct snapraid_io* io,
756 : block_off_t blockstart, block_off_t blockmax,
757 : int (*block_is_enabled)(void* arg, block_off_t), void* blockarg)
758 : {
759 : unsigned i;
760 :
761 95 : io->block_start = blockstart;
762 95 : io->block_max = blockmax;
763 95 : io->block_is_enabled = block_is_enabled;
764 95 : io->block_arg = blockarg;
765 95 : io->block_next = blockstart;
766 :
767 95 : io->done = 0;
768 95 : io->reader_index = io->io_max - 1;
769 95 : io->writer_index = 0;
770 :
771 : /* clear writer errors */
772 475 : for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
773 380 : io->writer_error[i] = 0;
774 :
775 : /* setup the initial read pending tasks, except the latest one, */
776 : /* the latest will be initialized at the fist io_read_next() call */
777 12160 : for (i = 0; i < io->io_max - 1; ++i) {
778 12065 : block_off_t blockcur = io_position_next(io);
779 :
780 12065 : io_reader_sched(io, i, blockcur);
781 : }
782 :
783 : /* setup the lists of workers to process */
784 95 : io->reader_list[0] = io->reader_max;
785 652 : for (i = 0; i <= io->writer_max; ++i)
786 557 : io->writer_list[i] = i;
787 :
788 : /* start the reader threads */
789 743 : for (i = 0; i < io->reader_max; ++i) {
790 648 : struct snapraid_worker* worker = &io->reader_map[i];
791 :
792 648 : worker->index = 0;
793 :
794 648 : thread_create(&worker->thread, 0, io_reader_thread, worker);
795 : }
796 :
797 : /* start the writer threads */
798 557 : for (i = 0; i < io->writer_max; ++i) {
799 462 : struct snapraid_worker* worker = &io->writer_map[i];
800 :
801 462 : worker->index = io->io_max - 1;
802 :
803 462 : thread_create(&worker->thread, 0, io_writer_thread, worker);
804 : }
805 95 : }
806 :
807 95 : static void io_stop_thread(struct snapraid_io* io)
808 : {
809 : unsigned i;
810 :
811 95 : thread_mutex_lock(&io->io_mutex);
812 :
813 : /* mark that we are stopping */
814 95 : io->done = 1;
815 :
816 : /* signal all the threads to recognize the new state */
817 95 : thread_cond_broadcast(&io->read_sched);
818 95 : thread_cond_broadcast(&io->write_sched);
819 :
820 95 : thread_mutex_unlock(&io->io_mutex);
821 :
822 : /* wait for all readers to terminate */
823 743 : for (i = 0; i < io->reader_max; ++i) {
824 648 : struct snapraid_worker* worker = &io->reader_map[i];
825 : void* retval;
826 :
827 : /* wait for thread termination */
828 648 : thread_join(worker->thread, &retval);
829 : }
830 :
831 : /* wait for all writers to terminate */
832 557 : for (i = 0; i < io->writer_max; ++i) {
833 462 : struct snapraid_worker* worker = &io->writer_map[i];
834 : void* retval;
835 :
836 : /* wait for thread termination */
837 462 : thread_join(worker->thread, &retval);
838 : }
839 95 : }
840 :
841 : #endif
842 :
843 : /*****************************************************************************/
844 : /* global */
845 :
846 96 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
847 : unsigned io_cache, unsigned buffer_max,
848 : void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
849 : struct snapraid_handle* handle_map, unsigned handle_max,
850 : void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
851 : void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
852 : struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
853 : {
854 : unsigned i;
855 : size_t allocated;
856 :
857 96 : io->state = state;
858 :
859 : #if HAVE_PTHREAD
860 96 : if (io_cache == 0) {
861 : /* default is 8 MiB of cache */
862 : /* this seems to be a good tradeoff between speed and memory usage */
863 95 : io->io_max = 8 * 1024 * 1024 / state->block_size;
864 95 : if (io->io_max < IO_MIN)
865 0 : io->io_max = IO_MIN;
866 95 : if (io->io_max > IO_MAX)
867 95 : io->io_max = IO_MAX;
868 : } else {
869 1 : io->io_max = io_cache;
870 : }
871 : #else
872 : (void)io_cache;
873 :
874 : /* without pthread force the mono thread mode */
875 : io->io_max = 1;
876 : #endif
877 :
878 96 : assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
879 :
880 96 : io->buffer_max = buffer_max;
881 96 : allocated = 0;
882 12257 : for (i = 0; i < io->io_max; ++i) {
883 12161 : if (state->file_mode != ADVISE_DIRECT)
884 12161 : io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, state->block_size, &io->buffer_alloc_map[i]);
885 : else
886 0 : io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, state->block_size, &io->buffer_alloc_map[i]);
887 12161 : if (!state->opt.skip_self)
888 0 : mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
889 12161 : allocated += state->block_size * buffer_max;
890 : }
891 :
892 96 : msg_progress("Using %u MiB of memory for %u blocks of IO cache.\n", (unsigned)(allocated / MEBI), io->io_max);
893 :
894 96 : if (parity_writer) {
895 83 : io->reader_max = handle_max;
896 83 : io->writer_max = parity_handle_max;
897 : } else {
898 13 : io->reader_max = handle_max + parity_handle_max;
899 13 : io->writer_max = 0;
900 : }
901 :
902 96 : io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
903 96 : io->reader_list = malloc_nofail(io->reader_max + 1);
904 96 : io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
905 96 : io->writer_list = malloc_nofail(io->writer_max + 1);
906 :
907 96 : io->data_base = 0;
908 96 : io->data_count = handle_max;
909 96 : io->parity_base = handle_max;
910 96 : io->parity_count = parity_handle_max;
911 :
912 750 : for (i = 0; i < io->reader_max; ++i) {
913 654 : struct snapraid_worker* worker = &io->reader_map[i];
914 :
915 654 : worker->io = io;
916 :
917 654 : if (i < handle_max) {
918 : /* it's a data read */
919 576 : worker->handle = &handle_map[i];
920 576 : worker->parity_handle = 0;
921 576 : worker->func = data_reader;
922 :
923 : /* data read is put in lower buffer index */
924 576 : worker->buffer_skew = 0;
925 : } else {
926 : /* it's a parity read */
927 78 : worker->handle = 0;
928 78 : worker->parity_handle = &parity_handle_map[i - handle_max];
929 78 : worker->func = parity_reader;
930 :
931 : /* parity read is put after data and computed parity */
932 78 : worker->buffer_skew = parity_handle_max;
933 : }
934 : }
935 :
936 559 : for (i = 0; i < io->writer_max; ++i) {
937 463 : struct snapraid_worker* worker = &io->writer_map[i];
938 :
939 463 : worker->io = io;
940 :
941 : /* it's a parity write */
942 463 : worker->handle = 0;
943 463 : worker->parity_handle = &parity_handle_map[i];
944 463 : worker->func = parity_writer;
945 :
946 : /* parity to write is put after data */
947 463 : worker->buffer_skew = handle_max;
948 : }
949 :
950 : #if HAVE_PTHREAD
951 96 : if (io->io_max > 1) {
952 95 : io_read_next = io_read_next_thread;
953 95 : io_write_preset = io_write_preset_thread;
954 95 : io_write_next = io_write_next_thread;
955 95 : io_refresh = io_refresh_thread;
956 95 : io_data_read = io_data_read_thread;
957 95 : io_parity_read = io_parity_read_thread;
958 95 : io_parity_write = io_parity_write_thread;
959 95 : io_start = io_start_thread;
960 95 : io_stop = io_stop_thread;
961 :
962 95 : thread_mutex_init(&io->io_mutex, 0);
963 95 : thread_cond_init(&io->read_done, 0);
964 95 : thread_cond_init(&io->read_sched, 0);
965 95 : thread_cond_init(&io->write_done, 0);
966 95 : thread_cond_init(&io->write_sched, 0);
967 : } else
968 : #endif
969 : {
970 1 : io_read_next = io_read_next_mono;
971 1 : io_write_preset = io_write_preset_mono;
972 1 : io_write_next = io_write_next_mono;
973 1 : io_refresh = io_refresh_mono;
974 1 : io_data_read = io_data_read_mono;
975 1 : io_parity_read = io_parity_read_mono;
976 1 : io_parity_write = io_parity_write_mono;
977 1 : io_start = io_start_mono;
978 1 : io_stop = io_stop_mono;
979 : }
980 96 : }
981 :
982 96 : void io_done(struct snapraid_io* io)
983 : {
984 : unsigned i;
985 :
986 12257 : for (i = 0; i < io->io_max; ++i) {
987 12161 : free(io->buffer_map[i]);
988 12161 : free(io->buffer_alloc_map[i]);
989 : }
990 :
991 96 : free(io->reader_map);
992 96 : free(io->reader_list);
993 96 : free(io->writer_map);
994 96 : free(io->writer_list);
995 :
996 : #if HAVE_PTHREAD
997 96 : if (io->io_max > 1) {
998 95 : thread_mutex_destroy(&io->io_mutex);
999 95 : thread_cond_destroy(&io->read_done);
1000 95 : thread_cond_destroy(&io->read_sched);
1001 95 : thread_cond_destroy(&io->write_done);
1002 95 : thread_cond_destroy(&io->write_sched);
1003 : }
1004 : #endif
1005 96 : }
1006 :
|