LCOV - code coverage report
Current view: top level - cmdline - io.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 413 443 93.2 %
Date: 2026-04-29 15:04:44 Functions: 30 33 90.9 %

          Line data    Source code
       1             : // SPDX-License-Identifier: GPL-3.0-or-later
       2             : // Copyright (C) 2016 Andrea Mazzoleni
       3             : 
       4             : #include "portable.h"
       5             : 
       6             : #include "io.h"
       7             : 
       8             : void (*io_start)(struct snapraid_io* io,
       9             :         block_off_t blockstart, block_off_t blockmax,
      10             :         bit_vect_t* block_enabled) = 0;
      11             : void (*io_stop)(struct snapraid_io* io) = 0;
      12             : block_off_t (*io_read_next)(struct snapraid_io* io, void*** buffer) = 0;
      13             : struct snapraid_task* (*io_data_read)(struct snapraid_io* io, unsigned* diskcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      14             : struct snapraid_task* (*io_parity_read)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      15             : void (*io_parity_write)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      16             : void (*io_write_preset)(struct snapraid_io* io, block_off_t blockcur, int skip) = 0;
      17             : void (*io_write_next)(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error) = 0;
      18             : void (*io_refresh)(struct snapraid_io* io) = 0;
      19             : void (*io_flush)(struct snapraid_io* io) = 0;
      20             : 
      21             : /**
      22             :  * Get the next block position to process.
      23             :  */
      24      182525 : static block_off_t io_position_next(struct snapraid_io* io)
      25             : {
      26             :         block_off_t blockcur;
      27             : 
      28             :         /* get the next position */
      29      182525 :         if (io->block_enabled) {
      30      541891 :                 while (io->block_next < io->block_max && !bit_vect_test(io->block_enabled, io->block_next))
      31      369267 :                         ++io->block_next;
      32             :         }
      33             : 
      34      182525 :         blockcur = io->block_next;
      35             : 
      36             :         /* next block for the next call */
      37      182525 :         ++io->block_next;
      38             : 
      39      182525 :         return blockcur;
      40             : }
      41             : 
      42             : /**
      43             :  * Setup the next pending task for all readers.
      44             :  */
      45      182525 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      46             : {
      47             :         unsigned i;
      48             : 
      49     1477830 :         for (i = 0; i < io->reader_max; ++i) {
      50     1295305 :                 struct snapraid_worker* worker = &io->reader_map[i];
      51     1295305 :                 struct snapraid_task* task = &worker->task_map[task_index];
      52             : 
      53             :                 /* setup the new pending task */
      54     1295305 :                 if (blockcur < io->block_max)
      55     1201085 :                         task->state = TASK_STATE_READY;
      56             :                 else
      57       94220 :                         task->state = TASK_STATE_EMPTY;
      58             : 
      59     1295305 :                 task->path[0] = 0;
      60     1295305 :                 if (worker->handle)
      61     1071591 :                         task->disk = worker->handle->disk;
      62             :                 else
      63      223714 :                         task->disk = 0;
      64             : 
      65     1295305 :                 assert(worker->buffer_skew + i < io->buffer_max);
      66             : 
      67     1295305 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
      68     1295305 :                 task->position = blockcur;
      69     1295305 :                 task->block = 0;
      70     1295305 :                 task->file = 0;
      71     1295305 :                 task->file_pos = 0;
      72     1295305 :                 task->read_size = 0;
      73     1295305 :                 task->is_timestamp_different = 0;
      74             :         }
      75      182525 : }
      76             : 
      77             : /**
      78             :  * Setup the next pending task for all writers.
      79             :  */
      80      131776 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      81             : {
      82             :         unsigned i;
      83             : 
      84      716497 :         for (i = 0; i < io->writer_max; ++i) {
      85      584721 :                 struct snapraid_worker* worker = &io->writer_map[i];
      86      584721 :                 struct snapraid_task* task = &worker->task_map[task_index];
      87             : 
      88             :                 /* setup the new pending task */
      89      584721 :                 task->state = TASK_STATE_READY;
      90      584721 :                 task->path[0] = 0;
      91      584721 :                 task->disk = 0;
      92      584721 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
      93      584721 :                 task->position = blockcur;
      94      584721 :                 task->block = 0;
      95      584721 :                 task->file = 0;
      96      584721 :                 task->file_pos = 0;
      97      584721 :                 task->read_size = 0;
      98      584721 :                 task->is_timestamp_different = 0;
      99             :         }
     100      131776 : }
     101             : 
     102             : /**
     103             :  * Setup an empty next pending task for all writers.
     104             :  */
     105          19 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
     106             : {
     107             :         unsigned i;
     108             : 
     109         133 :         for (i = 0; i < io->writer_max; ++i) {
     110         114 :                 struct snapraid_worker* worker = &io->writer_map[i];
     111         114 :                 struct snapraid_task* task = &worker->task_map[task_index];
     112             : 
     113             :                 /* setup the new pending task */
     114         114 :                 task->state = TASK_STATE_EMPTY;
     115         114 :                 task->path[0] = 0;
     116         114 :                 task->disk = 0;
     117         114 :                 task->buffer = 0;
     118         114 :                 task->position = blockcur;
     119         114 :                 task->block = 0;
     120         114 :                 task->file = 0;
     121         114 :                 task->file_pos = 0;
     122         114 :                 task->read_size = 0;
     123         114 :                 task->is_timestamp_different = 0;
     124             :         }
     125          19 : }
     126             : 
     127             : /*****************************************************************************/
     128             : /* mono thread */
     129             : 
     130        9376 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
     131             : {
     132             :         block_off_t blockcur_schedule;
     133             : 
     134             :         /* reset the index */
     135        9376 :         io->reader_index = 0;
     136             : 
     137        9376 :         blockcur_schedule = io_position_next(io);
     138             : 
     139             :         /* schedule the next read */
     140        9376 :         io_reader_sched(io, 0, blockcur_schedule);
     141             : 
     142             :         /* set the buffer to use */
     143        9376 :         *buffer = io->buffer_map[0];
     144             : 
     145        9376 :         return blockcur_schedule;
     146             : }
     147             : 
     148        9374 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
     149             : {
     150             :         unsigned i;
     151             : 
     152             :         /* reset the index */
     153        9374 :         io->writer_index = 0;
     154             : 
     155             :         /* clear errors */
     156       46870 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     157       37496 :                 io->writer_error[i] = 0;
     158             : 
     159        9374 :         if (skip) {
     160             :                 /* skip the next write */
     161           0 :                 io_writer_sched_empty(io, 0, blockcur);
     162             :         } else {
     163             :                 /* schedule the next write */
     164        9374 :                 io_writer_sched(io, 0, blockcur);
     165             :         }
     166        9374 : }
     167             : 
     168        9374 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     169             : {
     170             :         unsigned i;
     171             : 
     172             :         (void)blockcur;
     173             :         (void)skip;
     174             : 
     175             :         /* report errors */
     176       46870 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     177       37496 :                 writer_error[i] = io->writer_error[i];
     178        9374 : }
     179             : 
     180           0 : static void io_refresh_mono(struct snapraid_io* io)
     181             : {
     182             :         (void)io;
     183           0 : }
     184             : 
     185           2 : static void io_flush_mono(struct snapraid_io* io)
     186             : {
     187             :         (void)io;
     188           2 : }
     189             : 
     190       56244 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     191             : {
     192             :         struct snapraid_worker* worker;
     193             :         struct snapraid_task* task;
     194             :         unsigned i;
     195             : 
     196             :         /* get the next task */
     197       56244 :         i = io->reader_index++;
     198             : 
     199       56244 :         assert(base <= i && i < base + count);
     200             : 
     201       56244 :         worker = &io->reader_map[i];
     202       56244 :         task = &worker->task_map[0];
     203             : 
     204             :         /* do the work */
     205       56244 :         if (task->state != TASK_STATE_EMPTY)
     206       56244 :                 worker->func(worker, task);
     207             : 
     208             :         /* return the position */
     209       56244 :         *pos = i - base;
     210             : 
     211             :         /* store the waiting index */
     212       56244 :         waiting_map[0] = i - base;
     213       56244 :         *waiting_mac = 1;
     214             : 
     215       56244 :         return task;
     216             : }
     217             : 
     218       56244 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     219             : {
     220       56244 :         return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     221             : }
     222             : 
     223           0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     224             : {
     225           0 :         return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     226             : }
     227             : 
     228        9374 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     229             : {
     230             :         struct snapraid_worker* worker;
     231             :         struct snapraid_task* task;
     232             :         unsigned i;
     233             : 
     234             :         /* get the next task */
     235        9374 :         i = io->writer_index++;
     236             : 
     237        9374 :         worker = &io->writer_map[i];
     238        9374 :         task = &worker->task_map[0];
     239             : 
     240        9374 :         io->writer_error[i] = 0;
     241             : 
     242             :         /* do the work */
     243        9374 :         if (task->state != TASK_STATE_EMPTY)
     244        9374 :                 worker->func(worker, task);
     245             : 
     246             :         /* return the position */
     247        9374 :         *pos = i;
     248             : 
     249             :         /* store the waiting index */
     250        9374 :         waiting_map[0] = i;
     251        9374 :         *waiting_mac = 1;
     252        9374 : }
     253             : 
     254           2 : static void io_start_mono(struct snapraid_io* io,
     255             :         block_off_t blockstart, block_off_t blockmax,
     256             :         bit_vect_t* block_enabled)
     257             : {
     258           2 :         io->block_start = blockstart;
     259           2 :         io->block_max = blockmax;
     260           2 :         io->block_enabled = block_enabled;
     261           2 :         io->block_next = blockstart;
     262           2 : }
     263             : 
     264           2 : static void io_stop_mono(struct snapraid_io* io)
     265             : {
     266             :         (void)io;
     267           2 : }
     268             : 
     269             : /*****************************************************************************/
     270             : /* multi thread */
     271             : 
     272             : /* disable multithread if pthread is not present */
     273             : #if HAVE_THREAD
     274             : 
     275             : /**
     276             :  * Get the next task to work on for a reader.
     277             :  *
     278             :  * This is the synchronization point for workers with the io.
     279             :  */
     280     1238848 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
     281             : {
     282     1238848 :         struct snapraid_io* io = worker->io;
     283             : 
     284             :         /* the synchronization is protected by the io mutex */
     285     1238848 :         thread_mutex_lock(&io->io_mutex);
     286             : 
     287     1031756 :         while (1) {
     288             :                 unsigned next_index;
     289             : 
     290             :                 /*
     291             :                  * Check if the worker has to exit
     292             :                  * even if there is work to do
     293             :                  */
     294     2270604 :                 if (io->done) {
     295         670 :                         thread_mutex_unlock(&io->io_mutex);
     296         670 :                         return 0;
     297             :                 }
     298             : 
     299             :                 /* get the next pending task */
     300     2269934 :                 next_index = (worker->index + 1) % io->io_max;
     301             : 
     302             :                 /* if the queue of pending tasks is not empty */
     303     2269934 :                 if (next_index != io->reader_index) {
     304             :                         struct snapraid_task* task;
     305             : 
     306             :                         /* the index that the IO may be waiting for */
     307     1238112 :                         unsigned waiting_index = io->reader_index;
     308             : 
     309             :                         /* the index that worker just completed */
     310     1238112 :                         unsigned done_index = worker->index;
     311             : 
     312             :                         /* get the new working task */
     313     1238112 :                         worker->index = next_index;
     314     1238112 :                         task = &worker->task_map[worker->index];
     315             : 
     316             :                         /* if the just completed task is at this index */
     317     1238112 :                         if (done_index == waiting_index) {
     318             :                                 /* notify the IO that a new read is complete */
     319        1279 :                                 thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
     320             :                         } else {
     321     1236833 :                                 thread_mutex_unlock(&io->io_mutex);
     322             :                         }
     323             : 
     324             :                         /* return the new task */
     325     1238112 :                         return task;
     326             :                 }
     327             : 
     328             :                 /* otherwise wait for a read_sched event */
     329     1031822 :                 thread_cond_wait(&io->read_sched, &io->io_mutex);
     330             :         }
     331             : }
     332             : 
     333             : /**
     334             :  * Get the next task to work on for a writer.
     335             :  *
     336             :  * This is the synchronization point for workers with the io.
     337             :  */
     338      575974 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
     339             : {
     340      575974 :         struct snapraid_io* io = worker->io;
     341             :         int error_index;
     342             : 
     343             :         /* the synchronization is protected by the io mutex */
     344      575974 :         thread_mutex_lock(&io->io_mutex);
     345             : 
     346             :         /* counts the number of errors in the global state */
     347      575974 :         error_index = state - IO_WRITER_ERROR_BASE;
     348      575974 :         if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
     349           0 :                 ++io->writer_error[error_index];
     350             : 
     351      558679 :         while (1) {
     352             :                 unsigned next_index;
     353             : 
     354             :                 /* get the next pending task */
     355     1134653 :                 next_index = (worker->index + 1) % io->io_max;
     356             : 
     357             :                 /* if the queue of pending tasks is not empty */
     358     1134653 :                 if (next_index != io->writer_index) {
     359             :                         struct snapraid_task* task;
     360             : 
     361             :                         /* the index that the IO may be waiting for */
     362      575461 :                         unsigned waiting_index = (io->writer_index + 1) % io->io_max;
     363             : 
     364             :                         /* the index that worker just completed */
     365      575461 :                         unsigned done_index = worker->index;
     366             : 
     367             :                         /* get the new working task */
     368      575461 :                         worker->index = next_index;
     369      575461 :                         task = &worker->task_map[worker->index];
     370             : 
     371             :                         /* if the just completed task is at this index */
     372      575461 :                         if (done_index == waiting_index) {
     373             :                                 /* notify the IO that a new write is complete */
     374        4426 :                                 thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
     375             :                         } else {
     376      571035 :                                 thread_mutex_unlock(&io->io_mutex);
     377             :                         }
     378             : 
     379             :                         /* return the new task */
     380      575461 :                         return task;
     381             :                 }
     382             : 
     383             :                 /*
     384             :                  * Check if the worker has to exit
     385             :                  * but only if there is no work to do
     386             :                  */
     387      559192 :                 if (io->done) {
     388         456 :                         thread_mutex_unlock(&io->io_mutex);
     389         456 :                         return 0;
     390             :                 }
     391             : 
     392             :                 /* otherwise wait for a write_sched event */
     393      558736 :                 thread_cond_wait(&io->write_sched, &io->io_mutex);
     394             :         }
     395             : }
     396             : 
     397             : /**
     398             :  * Get the next block position to operate on.
     399             :  *
     400             :  * This is the synchronization point for workers with the io.
     401             :  */
     402      158798 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
     403             : {
     404             :         block_off_t blockcur_schedule;
     405             :         block_off_t blockcur_caller;
     406             :         unsigned i;
     407             : 
     408             :         /* get the next parity position to process */
     409      158798 :         blockcur_schedule = io_position_next(io);
     410             : 
     411             :         /* ensure that all data/parity was read */
     412      158798 :         assert(io->reader_list[0] == io->reader_max);
     413             : 
     414             :         /* setup the list of workers to process */
     415     1463173 :         for (i = 0; i <= io->reader_max; ++i)
     416     1304375 :                 io->reader_list[i] = i;
     417             : 
     418             :         /* the synchronization is protected by the io mutex */
     419      158798 :         thread_mutex_lock(&io->io_mutex);
     420             : 
     421             :         /* schedule the next read */
     422      158798 :         io_reader_sched(io, io->reader_index, blockcur_schedule);
     423             : 
     424             :         /* set the index for the tasks to return to the caller */
     425      158798 :         io->reader_index = (io->reader_index + 1) % io->io_max;
     426             : 
     427             :         /* get the position to operate at high level from one task */
     428      158798 :         blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
     429             : 
     430             :         /* set the buffer to use */
     431      158798 :         *buffer = io->buffer_map[io->reader_index];
     432             : 
     433             :         /* signal all the workers that there is a new pending task */
     434      158798 :         thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
     435             : 
     436      158798 :         return blockcur_caller;
     437             : }
     438             : 
     439      122421 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
     440             : {
     441             :         (void)io;
     442             :         (void)blockcur;
     443             :         (void)skip;
     444      122421 : }
     445             : 
     446      122421 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     447             : {
     448             :         unsigned i;
     449             : 
     450             :         /* ensure that all parity was written */
     451      122421 :         assert(io->writer_list[0] == io->writer_max);
     452             : 
     453             :         /* setup the list of workers to process */
     454      820303 :         for (i = 0; i <= io->writer_max; ++i)
     455      697882 :                 io->writer_list[i] = i;
     456             : 
     457             :         /* the synchronization is protected by the io mutex */
     458      122421 :         thread_mutex_lock(&io->io_mutex);
     459             : 
     460             :         /* report errors */
     461      612105 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
     462      489684 :                 writer_error[i] = io->writer_error[i];
     463      489684 :                 io->writer_error[i] = 0;
     464             :         }
     465             : 
     466      122421 :         if (skip) {
     467             :                 /* skip the next write */
     468          19 :                 io_writer_sched_empty(io, io->writer_index, blockcur);
     469             :         } else {
     470             :                 /* schedule the next write */
     471      122402 :                 io_writer_sched(io, io->writer_index, blockcur);
     472             :         }
     473             : 
     474             :         /* at this point the writers must be in sync with the readers */
     475      122421 :         assert(io->writer_index == io->reader_index);
     476             : 
     477             :         /* set the index to be used for the next write */
     478      122421 :         io->writer_index = (io->writer_index + 1) % io->io_max;
     479             : 
     480             :         /* signal all the workers that there is a new pending task */
     481      122421 :         thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
     482      122421 : }
     483             : 
     484           0 : static void io_refresh_thread(struct snapraid_io* io)
     485             : {
     486             :         unsigned i;
     487             : 
     488             :         /* the synchronization is protected by the io mutex */
     489           0 :         thread_mutex_lock(&io->io_mutex);
     490             : 
     491             :         /* for all readers, count the number of read blocks */
     492           0 :         for (i = 0; i < io->reader_max; ++i) {
     493             :                 unsigned begin, end, cached;
     494           0 :                 struct snapraid_worker* worker = &io->reader_map[i];
     495             : 
     496             :                 /* the first block read */
     497           0 :                 begin = io->reader_index + 1;
     498             :                 /* the block in reading */
     499           0 :                 end = worker->index;
     500           0 :                 if (begin > end)
     501           0 :                         end += io->io_max;
     502           0 :                 cached = end - begin;
     503             : 
     504           0 :                 if (worker->parity_handle)
     505           0 :                         io->state->parity[worker->parity_handle->level].cached_blocks = cached;
     506             :                 else
     507           0 :                         worker->handle->disk->cached_blocks = cached;
     508             :         }
     509             : 
     510             :         /*
     511             :          * For all writers, count the number of written blocks
     512             :          * note that this is a kind of "opposite" of cached blocks
     513             :          */
     514           0 :         for (i = 0; i < io->writer_max; ++i) {
     515             :                 unsigned begin, end, cached;
     516           0 :                 struct snapraid_worker* worker = &io->writer_map[i];
     517             : 
     518             :                 /* the first block written */
     519           0 :                 begin = io->writer_index + 1;
     520             :                 /* the block in writing */
     521           0 :                 end = worker->index;
     522           0 :                 if (begin > end)
     523           0 :                         end += io->io_max;
     524           0 :                 cached = end - begin;
     525             : 
     526           0 :                 io->state->parity[worker->parity_handle->level].cached_blocks = cached;
     527             :         }
     528             : 
     529           0 :         thread_mutex_unlock(&io->io_mutex);
     530           0 : }
     531             : 
     532      116305 : static void io_flush_thread(struct snapraid_io* io)
     533             : {
     534             :         unsigned i;
     535             : 
     536      116206 :         while (1) {
     537      116305 :                 int all_done = 1;
     538             : 
     539             :                 /* the synchronization is protected by the io mutex */
     540      116305 :                 thread_mutex_lock(&io->io_mutex);
     541             : 
     542      200917 :                 for (i = 0; i < io->writer_max; ++i) {
     543      200818 :                         struct snapraid_worker* worker = &io->writer_map[i];
     544             : 
     545             :                         /* get the next pending task */
     546      200818 :                         unsigned next_index = (worker->index + 1) % io->io_max;
     547             : 
     548             :                         /* if the queue of pending tasks is not empty */
     549      200818 :                         if (next_index != io->writer_index) {
     550      116206 :                                 all_done = 0;
     551      116206 :                                 break;
     552             :                         }
     553             :                 }
     554             : 
     555      116305 :                 thread_mutex_unlock(&io->io_mutex);
     556             : 
     557      116305 :                 if (all_done)
     558          99 :                         break;
     559             : 
     560             :                 /* wait for something to complete */
     561      116206 :                 thread_yield();
     562             :         }
     563          99 : }
     564             : 
     565     1144841 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     566             : {
     567             :         unsigned waiting_cycle;
     568             : 
     569             :         /* count the waiting cycle */
     570     1144841 :         waiting_cycle = 0;
     571             : 
     572             :         /* clear the waiting indexes */
     573     1144841 :         *waiting_mac = 0;
     574             : 
     575             :         /* the synchronization is protected by the io mutex */
     576     1144841 :         thread_mutex_lock(&io->io_mutex);
     577             : 
     578        1147 :         while (1) {
     579             :                 unsigned char* let;
     580             :                 unsigned busy_index;
     581             : 
     582             :                 /*
     583             :                  * Get the index the IO is using
     584             :                  * we must ensure that this index has not a read in progress
     585             :                  * to avoid a concurrent access
     586             :                  */
     587     1145988 :                 busy_index = io->reader_index;
     588             : 
     589             :                 /* search for a worker that has already finished */
     590     1145988 :                 let = &io->reader_list[0];
     591        7048 :                 while (1) {
     592     1153036 :                         unsigned i = *let;
     593             : 
     594             :                         /* if we are at the end */
     595     1153036 :                         if (i == io->reader_max)
     596        1147 :                                 break;
     597             : 
     598             :                         /* if it's in range */
     599     1151889 :                         if (base <= i && i < base + count) {
     600             :                                 struct snapraid_worker* worker;
     601             : 
     602             :                                 /* if it's the first cycle */
     603     1148282 :                                 if (waiting_cycle == 0) {
     604             :                                         /* store the waiting indexes */
     605     1147057 :                                         waiting_map[(*waiting_mac)++] = i - base;
     606             :                                 }
     607             : 
     608     1148282 :                                 worker = &io->reader_map[i];
     609             : 
     610             :                                 /* if the worker has finished this index */
     611     1148282 :                                 if (busy_index != worker->index) {
     612             :                                         struct snapraid_task* task;
     613             : 
     614     1144841 :                                         task = &worker->task_map[io->reader_index];
     615             : 
     616     1144841 :                                         thread_mutex_unlock(&io->io_mutex);
     617             : 
     618             :                                         /*
     619             :                                          * Mark the worker as processed
     620             :                                          * setting the previous one to point at the next one
     621             :                                          */
     622     1144841 :                                         *let = io->reader_list[i + 1];
     623             : 
     624             :                                         /* return the position */
     625     1144841 :                                         *pos = i - base;
     626             : 
     627             :                                         /* on the first cycle, no one is waiting */
     628     1144841 :                                         if (waiting_cycle == 0)
     629     1143696 :                                                 *waiting_mac = 0;
     630             : 
     631     1144841 :                                         return task;
     632             :                                 }
     633             :                         }
     634             : 
     635             :                         /* next position to check */
     636        7048 :                         let = &io->reader_list[i + 1];
     637             :                 }
     638             : 
     639             :                 /* if no worker is ready, wait for an event */
     640        1147 :                 thread_cond_wait(&io->read_done, &io->io_mutex);
     641             : 
     642             :                 /* count the cycles */
     643        1147 :                 ++waiting_cycle;
     644             :         }
     645             : }
     646             : 
     647      932007 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     648             : {
     649      932007 :         return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     650             : }
     651             : 
     652      212834 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     653             : {
     654      212834 :         return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     655             : }
     656             : 
     657      575461 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     658             : {
     659             :         unsigned waiting_cycle;
     660             : 
     661             :         /* count the waiting cycle */
     662      575461 :         waiting_cycle = 0;
     663             : 
     664             :         /* clear the waiting indexes */
     665      575461 :         *waiting_mac = 0;
     666             : 
     667             :         /* the synchronization is protected by the io mutex */
     668      575461 :         thread_mutex_lock(&io->io_mutex);
     669             : 
     670        4409 :         while (1) {
     671             :                 unsigned char* let;
     672             :                 unsigned busy_index;
     673             : 
     674             :                 /*
     675             :                  * Get the next index the IO is going to use
     676             :                  * we must ensure that this index has not a write in progress
     677             :                  * to avoid a concurrent access
     678             :                  * note that we are already sure that a write is not in progress
     679             :                  * at the index the IO is using at now
     680             :                  */
     681      579870 :                 busy_index = (io->writer_index + 1) % io->io_max;
     682             : 
     683             :                 /* search for a worker that has already finished */
     684      579870 :                 let = &io->writer_list[0];
     685        4409 :                 while (1) {
     686      584279 :                         unsigned i = *let;
     687             :                         struct snapraid_worker* worker;
     688             : 
     689             :                         /* if we are at the end */
     690      584279 :                         if (i == io->writer_max)
     691        4409 :                                 break;
     692             : 
     693             :                         /* if it's the first cycle */
     694      579870 :                         if (waiting_cycle == 0) {
     695             :                                 /* store the waiting indexes */
     696      575461 :                                 waiting_map[(*waiting_mac)++] = i;
     697             :                         }
     698             : 
     699      579870 :                         worker = &io->writer_map[i];
     700             : 
     701             :                         /* the two indexes cannot be equal */
     702      579870 :                         assert(io->writer_index != worker->index);
     703             : 
     704             :                         /* if the worker has finished this index */
     705      579870 :                         if (busy_index != worker->index) {
     706      575461 :                                 thread_mutex_unlock(&io->io_mutex);
     707             : 
     708             :                                 /*
     709             :                                  * Mark the worker as processed
     710             :                                  * setting the previous one to point at the next one
     711             :                                  */
     712      575461 :                                 *let = io->writer_list[i + 1];
     713             : 
     714             :                                 /* return the position */
     715      575461 :                                 *pos = i;
     716             : 
     717             :                                 /* on the first cycle, no one is waiting */
     718      575461 :                                 if (waiting_cycle == 0)
     719      571052 :                                         *waiting_mac = 0;
     720             : 
     721      575461 :                                 return;
     722             :                         }
     723             : 
     724             :                         /* next position to check */
     725        4409 :                         let = &io->writer_list[i + 1];
     726             :                 }
     727             : 
     728             :                 /* if no worker is ready, wait for an event */
     729        4409 :                 thread_cond_wait(&io->write_done, &io->io_mutex);
     730             : 
     731             :                 /* count the cycles */
     732        4409 :                 ++waiting_cycle;
     733             :         }
     734             : }
     735             : 
     736     1144979 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
     737             : {
     738             :         /* if we reached the end */
     739     1144979 :         if (task->position >= worker->io->block_max) {
     740             :                 /* complete a dummy task */
     741         138 :                 task->state = TASK_STATE_EMPTY;
     742             :         } else {
     743     1144841 :                 worker->func(worker, task);
     744             :         }
     745     1144979 : }
     746             : 
     747         736 : static void* io_reader_thread(void* arg)
     748             : {
     749         736 :         struct snapraid_worker* worker = arg;
     750             : 
     751             :         /* force completion of the first task */
     752         736 :         io_reader_worker(worker, &worker->task_map[0]);
     753             : 
     754     1238112 :         while (1) {
     755             :                 struct snapraid_task* task;
     756             : 
     757             :                 /* get the new task */
     758     1238848 :                 task = io_reader_step(worker);
     759             : 
     760             :                 /* if no task, it means to exit */
     761     1238782 :                 if (!task)
     762         670 :                         break;
     763             : 
     764             :                 /* nothing more to do */
     765     1238112 :                 if (task->state == TASK_STATE_EMPTY)
     766       93869 :                         continue;
     767             : 
     768     1144243 :                 assert(task->state == TASK_STATE_READY);
     769             : 
     770             :                 /* work on the assigned task */
     771     1144243 :                 io_reader_worker(worker, task);
     772             :         }
     773             : 
     774         670 :         return 0;
     775             : }
     776             : 
     777         513 : static void* io_writer_thread(void* arg)
     778             : {
     779         513 :         struct snapraid_worker* worker = arg;
     780         513 :         int latest_state = TASK_STATE_DONE;
     781             : 
     782      575461 :         while (1) {
     783             :                 struct snapraid_task* task;
     784             : 
     785             :                 /* get the new task */
     786      575974 :                 task = io_writer_step(worker, latest_state);
     787             : 
     788             :                 /* if no task, it means to exit */
     789      575917 :                 if (!task)
     790         456 :                         break;
     791             : 
     792             :                 /* nothing more to do */
     793      575461 :                 if (task->state == TASK_STATE_EMPTY) {
     794         114 :                         latest_state = TASK_STATE_DONE;
     795         114 :                         continue;
     796             :                 }
     797             : 
     798      575347 :                 assert(task->state == TASK_STATE_READY);
     799             : 
     800             :                 /* work on the assigned task */
     801      575347 :                 worker->func(worker, task);
     802             : 
     803             :                 /* save the resulting state */
     804      575347 :                 latest_state = task->state;
     805             :         }
     806             : 
     807         456 :         return 0;
     808             : }
     809             : 
     810         113 : static void io_start_thread(struct snapraid_io* io,
     811             :         block_off_t blockstart, block_off_t blockmax,
     812             :         bit_vect_t* block_enabled)
     813             : {
     814             :         unsigned i;
     815             : 
     816         113 :         io->block_start = blockstart;
     817         113 :         io->block_max = blockmax;
     818         113 :         io->block_enabled = block_enabled;
     819         113 :         io->block_next = blockstart;
     820             : 
     821         113 :         io->done = 0;
     822         113 :         io->reader_index = io->io_max - 1; /* the first io_read_next() is going to set it to 0 */
     823         113 :         io->writer_index = 0;
     824             : 
     825             :         /* clear writer errors */
     826         565 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     827         452 :                 io->writer_error[i] = 0;
     828             : 
     829             :         /*
     830             :          * Setup the initial read pending tasks, except the latest one,
     831             :          * the latest will be initialized at the fist io_read_next() call
     832             :          */
     833       14464 :         for (i = 0; i < io->io_max - 1; ++i) {
     834       14351 :                 block_off_t blockcur = io_position_next(io);
     835             : 
     836       14351 :                 io_reader_sched(io, i, blockcur);
     837             :         }
     838             : 
     839             :         /* setup the lists of workers to process */
     840         113 :         io->reader_list[0] = io->reader_max;
     841         739 :         for (i = 0; i <= io->writer_max; ++i)
     842         626 :                 io->writer_list[i] = i;
     843             : 
     844             :         /* start the reader threads */
     845         849 :         for (i = 0; i < io->reader_max; ++i) {
     846         736 :                 struct snapraid_worker* worker = &io->reader_map[i];
     847             : 
     848         736 :                 worker->index = 0;
     849             : 
     850         736 :                 thread_create(&worker->thread, io_reader_thread, worker);
     851             :         }
     852             : 
     853             :         /* start the writer threads */
     854         626 :         for (i = 0; i < io->writer_max; ++i) {
     855         513 :                 struct snapraid_worker* worker = &io->writer_map[i];
     856             : 
     857         513 :                 worker->index = io->io_max - 1;
     858             : 
     859         513 :                 thread_create(&worker->thread, io_writer_thread, worker);
     860             :         }
     861         113 : }
     862             : 
     863         101 : static void io_stop_thread(struct snapraid_io* io)
     864             : {
     865             :         unsigned i;
     866             : 
     867         101 :         thread_mutex_lock(&io->io_mutex);
     868             : 
     869             :         /* mark that we are stopping */
     870         101 :         io->done = 1;
     871             : 
     872             :         /* signal all the threads to recognize the new state */
     873         101 :         thread_cond_broadcast(&io->read_sched);
     874         101 :         thread_cond_broadcast(&io->write_sched);
     875             : 
     876         101 :         thread_mutex_unlock(&io->io_mutex);
     877             : 
     878             :         /* wait for all readers to terminate */
     879         771 :         for (i = 0; i < io->reader_max; ++i) {
     880         670 :                 struct snapraid_worker* worker = &io->reader_map[i];
     881             :                 void* retval;
     882             : 
     883             :                 /* wait for thread termination */
     884         670 :                 thread_join(worker->thread, &retval);
     885             :         }
     886             : 
     887             :         /* wait for all writers to terminate */
     888         557 :         for (i = 0; i < io->writer_max; ++i) {
     889         456 :                 struct snapraid_worker* worker = &io->writer_map[i];
     890             :                 void* retval;
     891             : 
     892             :                 /* wait for thread termination */
     893         456 :                 thread_join(worker->thread, &retval);
     894             :         }
     895         101 : }
     896             : 
     897             : #endif
     898             : 
     899             : /*****************************************************************************/
     900             : /* global */
     901             : 
     902         115 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
     903             :         unsigned io_cache, unsigned buffer_max,
     904             :         void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
     905             :         struct snapraid_handle* handle_map, unsigned handle_max,
     906             :         void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
     907             :         void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
     908             :         struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
     909             : {
     910             :         unsigned i;
     911             :         size_t allocated_size;
     912         115 :         size_t block_size = state->block_size;
     913             : 
     914         115 :         io->state = state;
     915             : 
     916         115 :         assert(buffer_max >= handle_max + parity_handle_max);
     917             : 
     918             :         /* initialize bandwidth limiting */
     919         115 :         bw_init(&io->bw, state->opt.bwlimit);
     920             : 
     921             :         /* set IO context in handles */
     922         778 :         for (i = 0; i < handle_max; ++i)
     923         663 :                 handle_map[i].bw = &io->bw;
     924         715 :         for (i = 0; i < parity_handle_max; ++i)
     925         600 :                 parity_handle_map[i].bw = &io->bw;
     926             : 
     927             : #if HAVE_THREAD
     928         115 :         if (io_cache == 0) {
     929             :                 /*
     930             :                  * Default is 16 MiB of cache
     931             :                  * this seems to be a good tradeoff between speed and memory usage
     932             :                  */
     933         113 :                 io->io_max = 16 * 1024 * 1024 / state->block_size;
     934         113 :                 if (io->io_max < IO_MIN)
     935           0 :                         io->io_max = IO_MIN;
     936         113 :                 if (io->io_max > IO_MAX)
     937         113 :                         io->io_max = IO_MAX;
     938             :         } else {
     939           2 :                 io->io_max = io_cache;
     940             :         }
     941             : #else
     942             :         (void)io_cache;
     943             : 
     944             :         /* without pthread force the mono thread mode */
     945             :         io->io_max = 1;
     946             : #endif
     947             : 
     948         115 :         assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
     949             : 
     950         115 :         io->buffer_max = buffer_max;
     951         115 :         allocated_size = 0;
     952       14581 :         for (i = 0; i < io->io_max; ++i) {
     953       14466 :                 if (state->file_mode != ADVISE_DIRECT)
     954       14338 :                         io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
     955             :                 else
     956         128 :                         io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
     957       14466 :                 if (!state->opt.skip_self)
     958           0 :                         mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
     959       14466 :                 allocated_size += block_size * buffer_max;
     960             :         }
     961             : 
     962         115 :         msg_progress("Using %u MiB of memory for %u cached blocks.\n", (unsigned)(allocated_size / MEBI), io->io_max);
     963             : 
     964         115 :         if (parity_writer) {
     965         100 :                 io->reader_max = handle_max;
     966         100 :                 io->writer_max = parity_handle_max;
     967             :         } else {
     968          15 :                 io->reader_max = handle_max + parity_handle_max;
     969          15 :                 io->writer_max = 0;
     970             :         }
     971             : 
     972         115 :         io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
     973         115 :         io->reader_list = malloc_nofail(io->reader_max + 1);
     974         115 :         io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
     975         115 :         io->writer_list = malloc_nofail(io->writer_max + 1);
     976             : 
     977         115 :         io->data_base = 0;
     978         115 :         io->data_count = handle_max;
     979         115 :         io->parity_base = handle_max;
     980         115 :         io->parity_count = parity_handle_max;
     981             : 
     982         863 :         for (i = 0; i < io->reader_max; ++i) {
     983         748 :                 struct snapraid_worker* worker = &io->reader_map[i];
     984             : 
     985         748 :                 worker->io = io;
     986             : 
     987         748 :                 if (i < handle_max) {
     988             :                         /* it's a data read */
     989         663 :                         worker->handle = &handle_map[i];
     990         663 :                         worker->parity_handle = 0;
     991         663 :                         worker->func = data_reader;
     992             : 
     993             :                         /* data read is put in lower buffer index */
     994         663 :                         worker->buffer_skew = 0;
     995             :                 } else {
     996             :                         /* it's a parity read */
     997          85 :                         worker->handle = 0;
     998          85 :                         worker->parity_handle = &parity_handle_map[i - handle_max];
     999          85 :                         worker->func = parity_reader;
    1000             : 
    1001             :                         /* parity read is put after data and computed parity */
    1002          85 :                         worker->buffer_skew = parity_handle_max;
    1003             :                 }
    1004             :         }
    1005             : 
    1006         630 :         for (i = 0; i < io->writer_max; ++i) {
    1007         515 :                 struct snapraid_worker* worker = &io->writer_map[i];
    1008             : 
    1009         515 :                 worker->io = io;
    1010             : 
    1011             :                 /* it's a parity write */
    1012         515 :                 worker->handle = 0;
    1013         515 :                 worker->parity_handle = &parity_handle_map[i];
    1014         515 :                 worker->func = parity_writer;
    1015             : 
    1016             :                 /* parity to write is put after data */
    1017         515 :                 worker->buffer_skew = handle_max;
    1018             :         }
    1019             : 
    1020             : #if HAVE_THREAD
    1021         115 :         if (io->io_max > 1) {
    1022         113 :                 io_read_next = io_read_next_thread;
    1023         113 :                 io_write_preset = io_write_preset_thread;
    1024         113 :                 io_write_next = io_write_next_thread;
    1025         113 :                 io_refresh = io_refresh_thread;
    1026         113 :                 io_flush = io_flush_thread;
    1027         113 :                 io_data_read = io_data_read_thread;
    1028         113 :                 io_parity_read = io_parity_read_thread;
    1029         113 :                 io_parity_write = io_parity_write_thread;
    1030         113 :                 io_start = io_start_thread;
    1031         113 :                 io_stop = io_stop_thread;
    1032             : 
    1033         113 :                 thread_mutex_init(&io->io_mutex);
    1034         113 :                 thread_cond_init(&io->read_done);
    1035         113 :                 thread_cond_init(&io->read_sched);
    1036         113 :                 thread_cond_init(&io->write_done);
    1037         113 :                 thread_cond_init(&io->write_sched);
    1038             :         } else
    1039             : #endif
    1040             :         {
    1041           2 :                 io_read_next = io_read_next_mono;
    1042           2 :                 io_write_preset = io_write_preset_mono;
    1043           2 :                 io_write_next = io_write_next_mono;
    1044           2 :                 io_refresh = io_refresh_mono;
    1045           2 :                 io_flush = io_flush_mono;
    1046           2 :                 io_data_read = io_data_read_mono;
    1047           2 :                 io_parity_read = io_parity_read_mono;
    1048           2 :                 io_parity_write = io_parity_write_mono;
    1049           2 :                 io_start = io_start_mono;
    1050           2 :                 io_stop = io_stop_mono;
    1051             :         }
    1052         115 : }
    1053             : 
    1054         103 : void io_done(struct snapraid_io* io)
    1055             : {
    1056             :         unsigned i;
    1057             : 
    1058       13033 :         for (i = 0; i < io->io_max; ++i) {
    1059       12930 :                 free(io->buffer_map[i]);
    1060       12930 :                 free(io->buffer_alloc_map[i]);
    1061             :         }
    1062             : 
    1063         103 :         free(io->reader_map);
    1064         103 :         free(io->reader_list);
    1065         103 :         free(io->writer_map);
    1066         103 :         free(io->writer_list);
    1067             : 
    1068             : #if HAVE_THREAD
    1069         103 :         if (io->io_max > 1) {
    1070         101 :                 thread_mutex_destroy(&io->io_mutex);
    1071         101 :                 thread_cond_destroy(&io->read_done);
    1072         101 :                 thread_cond_destroy(&io->read_sched);
    1073         101 :                 thread_cond_destroy(&io->write_done);
    1074         101 :                 thread_cond_destroy(&io->write_sched);
    1075             :         }
    1076             : #endif
    1077         103 : }
    1078             : 

Generated by: LCOV version 1.0