LCOV - code coverage report
Current view: top level - cmdline - io.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 415 446 93.0 %
Date: 2025-10-28 11:59:11 Functions: 30 33 90.9 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (C) 2016 Andrea Mazzoleni
       3             :  *
       4             :  * This program is free software: you can redistribute it and/or modify
       5             :  * it under the terms of the GNU General Public License as published by
       6             :  * the Free Software Foundation, either version 3 of the License, or
       7             :  * (at your option) any later version.
       8             :  *
       9             :  * This program is distributed in the hope that it will be useful,
      10             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  * GNU General Public License for more details.
      13             :  *
      14             :  * You should have received a copy of the GNU General Public License
      15             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "portable.h"
      19             : 
      20             : #include "io.h"
      21             : 
      22             : void (*io_start)(struct snapraid_io* io,
      23             :         block_off_t blockstart, block_off_t blockmax,
      24             :         bit_vect_t* block_enabled) = 0;
      25             : void (*io_stop)(struct snapraid_io* io) = 0;
      26             : block_off_t (*io_read_next)(struct snapraid_io* io, void*** buffer) = 0;
      27             : struct snapraid_task* (*io_data_read)(struct snapraid_io* io, unsigned* diskcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      28             : struct snapraid_task* (*io_parity_read)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      29             : void (*io_parity_write)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      30             : void (*io_write_preset)(struct snapraid_io* io, block_off_t blockcur, int skip) = 0;
      31             : void (*io_write_next)(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error) = 0;
      32             : void (*io_refresh)(struct snapraid_io* io) = 0;
      33             : void (*io_flush)(struct snapraid_io* io) = 0;
      34             : 
      35             : /**
      36             :  * Get the next block position to process.
      37             :  */
      38      160035 : static block_off_t io_position_next(struct snapraid_io* io)
      39             : {
      40             :         block_off_t blockcur;
      41             : 
      42             :         /* get the next position */
      43      160035 :         if (io->block_enabled) {
      44      478075 :                 while (io->block_next < io->block_max && !bit_vect_test(io->block_enabled, io->block_next))
      45      327941 :                         ++io->block_next;
      46             :         }
      47             : 
      48      160035 :         blockcur = io->block_next;
      49             : 
      50             :         /* next block for the next call */
      51      160035 :         ++io->block_next;
      52             : 
      53      160035 :         return blockcur;
      54             : }
      55             : 
      56             : /**
      57             :  * Setup the next pending task for all readers.
      58             :  */
      59      160035 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      60             : {
      61             :         unsigned i;
      62             : 
      63     1342701 :         for (i = 0; i < io->reader_max; ++i) {
      64     1182666 :                 struct snapraid_worker* worker = &io->reader_map[i];
      65     1182666 :                 struct snapraid_task* task = &worker->task_map[task_index];
      66             : 
      67             :                 /* setup the new pending task */
      68     1182666 :                 if (blockcur < io->block_max)
      69     1098180 :                         task->state = TASK_STATE_READY;
      70             :                 else
      71       84486 :                         task->state = TASK_STATE_EMPTY;
      72             : 
      73     1182666 :                 task->path[0] = 0;
      74     1182666 :                 if (worker->handle)
      75      960210 :                         task->disk = worker->handle->disk;
      76             :                 else
      77      222456 :                         task->disk = 0;
      78             : 
      79     1182666 :                 assert(worker->buffer_skew + i < io->buffer_max);
      80             : 
      81     1182666 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
      82     1182666 :                 task->position = blockcur;
      83     1182666 :                 task->block = 0;
      84     1182666 :                 task->file = 0;
      85     1182666 :                 task->file_pos = 0;
      86     1182666 :                 task->read_size = 0;
      87     1182666 :                 task->is_timestamp_different = 0;
      88             :         }
      89      160035 : }
      90             : 
      91             : /**
      92             :  * Setup the next pending task for all writers.
      93             :  */
      94      111477 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      95             : {
      96             :         unsigned i;
      97             : 
      98      650234 :         for (i = 0; i < io->writer_max; ++i) {
      99      538757 :                 struct snapraid_worker* worker = &io->writer_map[i];
     100      538757 :                 struct snapraid_task* task = &worker->task_map[task_index];
     101             : 
     102             :                 /* setup the new pending task */
     103      538757 :                 task->state = TASK_STATE_READY;
     104      538757 :                 task->path[0] = 0;
     105      538757 :                 task->disk = 0;
     106      538757 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
     107      538757 :                 task->position = blockcur;
     108      538757 :                 task->block = 0;
     109      538757 :                 task->file = 0;
     110      538757 :                 task->file_pos = 0;
     111      538757 :                 task->read_size = 0;
     112      538757 :                 task->is_timestamp_different = 0;
     113             :         }
     114      111477 : }
     115             : 
     116             : /**
     117             :  * Setup an empty next pending task for all writers.
     118             :  */
     119         985 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
     120             : {
     121             :         unsigned i;
     122             : 
     123        6895 :         for (i = 0; i < io->writer_max; ++i) {
     124        5910 :                 struct snapraid_worker* worker = &io->writer_map[i];
     125        5910 :                 struct snapraid_task* task = &worker->task_map[task_index];
     126             : 
     127             :                 /* setup the new pending task */
     128        5910 :                 task->state = TASK_STATE_EMPTY;
     129        5910 :                 task->path[0] = 0;
     130        5910 :                 task->disk = 0;
     131        5910 :                 task->buffer = 0;
     132        5910 :                 task->position = blockcur;
     133        5910 :                 task->block = 0;
     134        5910 :                 task->file = 0;
     135        5910 :                 task->file_pos = 0;
     136        5910 :                 task->read_size = 0;
     137        5910 :                 task->is_timestamp_different = 0;
     138             :         }
     139         985 : }
     140             : 
     141             : /*****************************************************************************/
     142             : /* mono thread */
     143             : 
     144        4688 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
     145             : {
     146             :         block_off_t blockcur_schedule;
     147             : 
     148             :         /* reset the index */
     149        4688 :         io->reader_index = 0;
     150             : 
     151        4688 :         blockcur_schedule = io_position_next(io);
     152             : 
     153             :         /* schedule the next read */
     154        4688 :         io_reader_sched(io, 0, blockcur_schedule);
     155             : 
     156             :         /* set the buffer to use */
     157        4688 :         *buffer = io->buffer_map[0];
     158             : 
     159        4688 :         return blockcur_schedule;
     160             : }
     161             : 
     162        4687 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
     163             : {
     164             :         unsigned i;
     165             : 
     166             :         /* reset the index */
     167        4687 :         io->writer_index = 0;
     168             : 
     169             :         /* clear errors */
     170       23435 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     171       18748 :                 io->writer_error[i] = 0;
     172             : 
     173        4687 :         if (skip) {
     174             :                 /* skip the next write */
     175           0 :                 io_writer_sched_empty(io, 0, blockcur);
     176             :         } else {
     177             :                 /* schedule the next write */
     178        4687 :                 io_writer_sched(io, 0, blockcur);
     179             :         }
     180        4687 : }
     181             : 
     182        4687 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     183             : {
     184             :         unsigned i;
     185             : 
     186             :         (void)blockcur;
     187             :         (void)skip;
     188             : 
     189             :         /* report errors */
     190       23435 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     191       18748 :                 writer_error[i] = io->writer_error[i];
     192        4687 : }
     193             : 
     194           0 : static void io_refresh_mono(struct snapraid_io* io)
     195             : {
     196             :         (void)io;
     197           0 : }
     198             : 
     199           1 : static void io_flush_mono(struct snapraid_io* io)
     200             : {
     201             :         (void)io;
     202           1 : }
     203             : 
     204       28122 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     205             : {
     206             :         struct snapraid_worker* worker;
     207             :         struct snapraid_task* task;
     208             :         unsigned i;
     209             : 
     210             :         /* get the next task */
     211       28122 :         i = io->reader_index++;
     212             : 
     213       28122 :         assert(base <= i && i < base + count);
     214             : 
     215       28122 :         worker = &io->reader_map[i];
     216       28122 :         task = &worker->task_map[0];
     217             : 
     218             :         /* do the work */
     219       28122 :         if (task->state != TASK_STATE_EMPTY)
     220       28122 :                 worker->func(worker, task);
     221             : 
     222             :         /* return the position */
     223       28122 :         *pos = i - base;
     224             : 
     225             :         /* store the waiting index */
     226       28122 :         waiting_map[0] = i - base;
     227       28122 :         *waiting_mac = 1;
     228             : 
     229       28122 :         return task;
     230             : }
     231             : 
     232       28122 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     233             : {
     234       28122 :         return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     235             : }
     236             : 
     237           0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     238             : {
     239           0 :         return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     240             : }
     241             : 
     242        4687 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     243             : {
     244             :         struct snapraid_worker* worker;
     245             :         struct snapraid_task* task;
     246             :         unsigned i;
     247             : 
     248             :         /* get the next task */
     249        4687 :         i = io->writer_index++;
     250             : 
     251        4687 :         worker = &io->writer_map[i];
     252        4687 :         task = &worker->task_map[0];
     253             : 
     254        4687 :         io->writer_error[i] = 0;
     255             : 
     256             :         /* do the work */
     257        4687 :         if (task->state != TASK_STATE_EMPTY)
     258        4687 :                 worker->func(worker, task);
     259             : 
     260             :         /* return the position */
     261        4687 :         *pos = i;
     262             : 
     263             :         /* store the waiting index */
     264        4687 :         waiting_map[0] = i;
     265        4687 :         *waiting_mac = 1;
     266        4687 : }
     267             : 
     268           1 : static void io_start_mono(struct snapraid_io* io,
     269             :         block_off_t blockstart, block_off_t blockmax,
     270             :         bit_vect_t* block_enabled)
     271             : {
     272           1 :         io->block_start = blockstart;
     273           1 :         io->block_max = blockmax;
     274           1 :         io->block_enabled = block_enabled;
     275           1 :         io->block_next = blockstart;
     276           1 : }
     277             : 
     278           1 : static void io_stop_mono(struct snapraid_io* io)
     279             : {
     280             :         (void)io;
     281           1 : }
     282             : 
     283             : /*****************************************************************************/
     284             : /* multi thread */
     285             : 
     286             : /* disable multithread if pthread is not present */
     287             : #if HAVE_THREAD
     288             : 
     289             : /**
     290             :  * Get the next task to work on for a reader.
     291             :  *
     292             :  * This is the synchronization point for workers with the io.
     293             :  */
     294     1153294 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
     295             : {
     296     1153294 :         struct snapraid_io* io = worker->io;
     297             : 
     298             :         /* the synchronization is protected by the io mutex */
     299     1153294 :         thread_mutex_lock(&io->io_mutex);
     300             : 
     301      955717 :         while (1) {
     302             :                 unsigned next_index;
     303             : 
     304             :                 /* check if the worker has to exit */
     305             :                 /* even if there is work to do */
     306     2109011 :                 if (io->done) {
     307         660 :                         thread_mutex_unlock(&io->io_mutex);
     308         660 :                         return 0;
     309             :                 }
     310             : 
     311             :                 /* get the next pending task */
     312     2108351 :                 next_index = (worker->index + 1) % io->io_max;
     313             : 
     314             :                 /* if the queue of pending tasks is not empty */
     315     2108351 :                 if (next_index != io->reader_index) {
     316             :                         struct snapraid_task* task;
     317             : 
     318             :                         /* the index that the IO may be waiting for */
     319     1152634 :                         unsigned waiting_index = io->reader_index;
     320             : 
     321             :                         /* the index that worker just completed */
     322     1152634 :                         unsigned done_index = worker->index;
     323             : 
     324             :                         /* get the new working task */
     325     1152634 :                         worker->index = next_index;
     326     1152634 :                         task = &worker->task_map[worker->index];
     327             : 
     328             :                         /* if the just completed task is at this index */
     329     1152634 :                         if (done_index == waiting_index) {
     330             :                                 /* notify the IO that a new read is complete */
     331         924 :                                 thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
     332             :                         } else {
     333     1151710 :                                 thread_mutex_unlock(&io->io_mutex);
     334             :                         }
     335             : 
     336             :                         /* return the new task */
     337     1152634 :                         return task;
     338             :                 }
     339             : 
     340             :                 /* otherwise wait for a read_sched event */
     341      955717 :                 thread_cond_wait(&io->read_sched, &io->io_mutex);
     342             :         }
     343             : }
     344             : 
     345             : /**
     346             :  * Get the next task to work on for a writer.
     347             :  *
     348             :  * This is the synchronization point for workers with the io.
     349             :  */
     350      540442 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
     351             : {
     352      540442 :         struct snapraid_io* io = worker->io;
     353             :         int error_index;
     354             : 
     355             :         /* the synchronization is protected by the io mutex */
     356      540442 :         thread_mutex_lock(&io->io_mutex);
     357             : 
     358             :         /* counts the number of errors in the global state */
     359      540442 :         error_index = state - IO_WRITER_ERROR_BASE;
     360      540442 :         if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
     361           0 :                 ++io->writer_error[error_index];
     362             : 
     363      521854 :         while (1) {
     364             :                 unsigned next_index;
     365             : 
     366             :                 /* get the next pending task */
     367     1062296 :                 next_index = (worker->index + 1) % io->io_max;
     368             : 
     369             :                 /* if the queue of pending tasks is not empty */
     370     1062296 :                 if (next_index != io->writer_index) {
     371             :                         struct snapraid_task* task;
     372             : 
     373             :                         /* the index that the IO may be waiting for */
     374      539980 :                         unsigned waiting_index = (io->writer_index + 1) % io->io_max;
     375             : 
     376             :                         /* the index that worker just completed */
     377      539980 :                         unsigned done_index = worker->index;
     378             : 
     379             :                         /* get the new working task */
     380      539980 :                         worker->index = next_index;
     381      539980 :                         task = &worker->task_map[worker->index];
     382             : 
     383             :                         /* if the just completed task is at this index */
     384      539980 :                         if (done_index == waiting_index) {
     385             :                                 /* notify the IO that a new write is complete */
     386           6 :                                 thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
     387             :                         } else {
     388      539974 :                                 thread_mutex_unlock(&io->io_mutex);
     389             :                         }
     390             : 
     391             :                         /* return the new task */
     392      539980 :                         return task;
     393             :                 }
     394             : 
     395             :                 /* check if the worker has to exit */
     396             :                 /* but only if there is no work to do */
     397      522316 :                 if (io->done) {
     398         462 :                         thread_mutex_unlock(&io->io_mutex);
     399         462 :                         return 0;
     400             :                 }
     401             : 
     402             :                 /* otherwise wait for a write_sched event */
     403      521854 :                 thread_cond_wait(&io->write_sched, &io->io_mutex);
     404             :         }
     405             : }
     406             : 
     407             : /**
     408             :  * Get the next block position to operate on.
     409             :  *
     410             :  * This is the synchronization point for workers with the io.
     411             :  */
     412      143155 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
     413             : {
     414             :         block_off_t blockcur_schedule;
     415             :         block_off_t blockcur_caller;
     416             :         unsigned i;
     417             : 
     418             :         /* get the next parity position to process */
     419      143155 :         blockcur_schedule = io_position_next(io);
     420             : 
     421             :         /* ensure that all data/parity was read */
     422      143155 :         assert(io->reader_list[0] == io->reader_max);
     423             : 
     424             :         /* setup the list of workers to process */
     425     1357028 :         for (i = 0; i <= io->reader_max; ++i)
     426     1213873 :                 io->reader_list[i] = i;
     427             : 
     428             :         /* the synchronization is protected by the io mutex */
     429      143155 :         thread_mutex_lock(&io->io_mutex);
     430             : 
     431             :         /* schedule the next read */
     432      143155 :         io_reader_sched(io, io->reader_index, blockcur_schedule);
     433             : 
     434             :         /* set the index for the tasks to return to the caller */
     435      143155 :         io->reader_index = (io->reader_index + 1) % io->io_max;
     436             : 
     437             :         /* get the position to operate at high level from one task */
     438      143155 :         blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
     439             : 
     440             :         /* set the buffer to use */
     441      143155 :         *buffer = io->buffer_map[io->reader_index];
     442             : 
     443             :         /* signal all the workers that there is a new pending task */
     444      143155 :         thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
     445             : 
     446      143155 :         return blockcur_caller;
     447             : }
     448             : 
     449      107775 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
     450             : {
     451             :         (void)io;
     452             :         (void)blockcur;
     453             :         (void)skip;
     454      107775 : }
     455             : 
     456      107775 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     457             : {
     458             :         unsigned i;
     459             : 
     460             :         /* ensure that all parity was written */
     461      107775 :         assert(io->writer_list[0] == io->writer_max);
     462             : 
     463             :         /* setup the list of workers to process */
     464      755530 :         for (i = 0; i <= io->writer_max; ++i)
     465      647755 :                 io->writer_list[i] = i;
     466             : 
     467             :         /* the synchronization is protected by the io mutex */
     468      107775 :         thread_mutex_lock(&io->io_mutex);
     469             : 
     470             :         /* report errors */
     471      538875 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
     472      431100 :                 writer_error[i] = io->writer_error[i];
     473      431100 :                 io->writer_error[i] = 0;
     474             :         }
     475             : 
     476      107775 :         if (skip) {
     477             :                 /* skip the next write */
     478         985 :                 io_writer_sched_empty(io, io->writer_index, blockcur);
     479             :         } else {
     480             :                 /* schedule the next write */
     481      106790 :                 io_writer_sched(io, io->writer_index, blockcur);
     482             :         }
     483             : 
     484             :         /* at this point the writers must be in sync with the readers */
     485      107775 :         assert(io->writer_index == io->reader_index);
     486             : 
     487             :         /* set the index to be used for the next write */
     488      107775 :         io->writer_index = (io->writer_index + 1) % io->io_max;
     489             : 
     490             :         /* signal all the workers that there is a new pending task */
     491      107775 :         thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
     492      107775 : }
     493             : 
     494           0 : static void io_refresh_thread(struct snapraid_io* io)
     495             : {
     496             :         unsigned i;
     497             : 
     498             :         /* the synchronization is protected by the io mutex */
     499           0 :         thread_mutex_lock(&io->io_mutex);
     500             : 
     501             :         /* for all readers, count the number of read blocks */
     502           0 :         for (i = 0; i < io->reader_max; ++i) {
     503             :                 unsigned begin, end, cached;
     504           0 :                 struct snapraid_worker* worker = &io->reader_map[i];
     505             : 
     506             :                 /* the first block read */
     507           0 :                 begin = io->reader_index + 1;
     508             :                 /* the block in reading */
     509           0 :                 end = worker->index;
     510           0 :                 if (begin > end)
     511           0 :                         end += io->io_max;
     512           0 :                 cached = end - begin;
     513             : 
     514           0 :                 if (worker->parity_handle)
     515           0 :                         io->state->parity[worker->parity_handle->level].cached_blocks = cached;
     516             :                 else
     517           0 :                         worker->handle->disk->cached_blocks = cached;
     518             :         }
     519             : 
     520             :         /* for all writers, count the number of written blocks */
     521             :         /* note that this is a kind of "opposite" of cached blocks */
     522           0 :         for (i = 0; i < io->writer_max; ++i) {
     523             :                 unsigned begin, end, cached;
     524           0 :                 struct snapraid_worker* worker = &io->writer_map[i];
     525             : 
     526             :                 /* the first block written */
     527           0 :                 begin = io->writer_index + 1;
     528             :                 /* the block in writing */
     529           0 :                 end = worker->index;
     530           0 :                 if (begin > end)
     531           0 :                         end += io->io_max;
     532           0 :                 cached = end - begin;
     533             : 
     534           0 :                 io->state->parity[worker->parity_handle->level].cached_blocks = cached;
     535             :         }
     536             : 
     537           0 :         thread_mutex_unlock(&io->io_mutex);
     538           0 : }
     539             : 
     540      148589 : static void io_flush_thread(struct snapraid_io* io)
     541             : {
     542             :         unsigned i;
     543             : 
     544      148506 :         while (1) {
     545      148589 :                 int all_done = 1;
     546             : 
     547             :                 /* the synchronization is protected by the io mutex */
     548      148589 :                 thread_mutex_lock(&io->io_mutex);
     549             : 
     550      195492 :                 for (i = 0; i < io->writer_max; ++i) {
     551      195409 :                         struct snapraid_worker* worker = &io->writer_map[i];
     552             : 
     553             :                         /* get the next pending task */
     554      195409 :                         unsigned next_index = (worker->index + 1) % io->io_max;
     555             : 
     556             :                         /* if the queue of pending tasks is not empty */
     557      195409 :                         if (next_index != io->writer_index) {
     558      148506 :                                 all_done = 0;
     559      148506 :                                 break;
     560             :                         }
     561             :                 }
     562             : 
     563      148589 :                 thread_mutex_unlock(&io->io_mutex);
     564             : 
     565      148589 :                 if (all_done)
     566          83 :                         break;
     567             : 
     568             :                 /* wait for something to complete */
     569      148506 :                 thread_yield();
     570             :         }
     571          83 : }
     572             : 
     573     1070058 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     574             : {
     575             :         unsigned waiting_cycle;
     576             : 
     577             :         /* count the waiting cycle */
     578     1070058 :         waiting_cycle = 0;
     579             : 
     580             :         /* clear the waiting indexes */
     581     1070058 :         *waiting_mac = 0;
     582             : 
     583             :         /* the synchronization is protected by the io mutex */
     584     1070058 :         thread_mutex_lock(&io->io_mutex);
     585             : 
     586         815 :         while (1) {
     587             :                 unsigned char* let;
     588             :                 unsigned busy_index;
     589             : 
     590             :                 /* get the index the IO is using */
     591             :                 /* we must ensure that this index has not a read in progress */
     592             :                 /* to avoid a concurrent access */
     593     1070873 :                 busy_index = io->reader_index;
     594             : 
     595             :                 /* search for a worker that has already finished */
     596     1070873 :                 let = &io->reader_list[0];
     597        6501 :                 while (1) {
     598     1077374 :                         unsigned i = *let;
     599             : 
     600             :                         /* if we are at the end */
     601     1077374 :                         if (i == io->reader_max)
     602         815 :                                 break;
     603             : 
     604             :                         /* if it's in range */
     605     1076559 :                         if (base <= i && i < base + count) {
     606             :                                 struct snapraid_worker* worker;
     607             : 
     608             :                                 /* if it's the first cycle */
     609     1073001 :                                 if (waiting_cycle == 0) {
     610             :                                         /* store the waiting indexes */
     611     1072120 :                                         waiting_map[(*waiting_mac)++] = i - base;
     612             :                                 }
     613             : 
     614     1073001 :                                 worker = &io->reader_map[i];
     615             : 
     616             :                                 /* if the worker has finished this index */
     617     1073001 :                                 if (busy_index != worker->index) {
     618             :                                         struct snapraid_task* task;
     619             : 
     620     1070058 :                                         task = &worker->task_map[io->reader_index];
     621             : 
     622     1070058 :                                         thread_mutex_unlock(&io->io_mutex);
     623             : 
     624             :                                         /* mark the worker as processed */
     625             :                                         /* setting the previous one to point at the next one */
     626     1070058 :                                         *let = io->reader_list[i + 1];
     627             : 
     628             :                                         /* return the position */
     629     1070058 :                                         *pos = i - base;
     630             : 
     631             :                                         /* on the first cycle, no one is waiting */
     632     1070058 :                                         if (waiting_cycle == 0)
     633     1069246 :                                                 *waiting_mac = 0;
     634             : 
     635     1070058 :                                         return task;
     636             :                                 }
     637             :                         }
     638             : 
     639             :                         /* next position to check */
     640        6501 :                         let = &io->reader_list[i + 1];
     641             :                 }
     642             : 
     643             :                 /* if no worker is ready, wait for an event */
     644         815 :                 thread_cond_wait(&io->read_done, &io->io_mutex);
     645             : 
     646             :                 /* count the cycles */
     647         815 :                 ++waiting_cycle;
     648             :         }
     649             : }
     650             : 
     651      858354 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     652             : {
     653      858354 :         return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     654             : }
     655             : 
     656      211704 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     657             : {
     658      211704 :         return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     659             : }
     660             : 
     661      539980 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     662             : {
     663             :         unsigned waiting_cycle;
     664             : 
     665             :         /* count the waiting cycle */
     666      539980 :         waiting_cycle = 0;
     667             : 
     668             :         /* clear the waiting indexes */
     669      539980 :         *waiting_mac = 0;
     670             : 
     671             :         /* the synchronization is protected by the io mutex */
     672      539980 :         thread_mutex_lock(&io->io_mutex);
     673             : 
     674           4 :         while (1) {
     675             :                 unsigned char* let;
     676             :                 unsigned busy_index;
     677             : 
     678             :                 /* get the next index the IO is going to use */
     679             :                 /* we must ensure that this index has not a write in progress */
     680             :                 /* to avoid a concurrent access */
     681             :                 /* note that we are already sure that a write is not in progress */
     682             :                 /* at the index the IO is using at now */
     683      539984 :                 busy_index = (io->writer_index + 1) % io->io_max;
     684             : 
     685             :                 /* search for a worker that has already finished */
     686      539984 :                 let = &io->writer_list[0];
     687           4 :                 while (1) {
     688      539988 :                         unsigned i = *let;
     689             :                         struct snapraid_worker* worker;
     690             : 
     691             :                         /* if we are at the end */
     692      539988 :                         if (i == io->writer_max)
     693           4 :                                 break;
     694             : 
     695             :                         /* if it's the first cycle */
     696      539984 :                         if (waiting_cycle == 0) {
     697             :                                 /* store the waiting indexes */
     698      539980 :                                 waiting_map[(*waiting_mac)++] = i;
     699             :                         }
     700             : 
     701      539984 :                         worker = &io->writer_map[i];
     702             : 
     703             :                         /* the two indexes cannot be equal */
     704      539984 :                         assert(io->writer_index != worker->index);
     705             : 
     706             :                         /* if the worker has finished this index */
     707      539984 :                         if (busy_index != worker->index) {
     708      539980 :                                 thread_mutex_unlock(&io->io_mutex);
     709             : 
     710             :                                 /* mark the worker as processed */
     711             :                                 /* setting the previous one to point at the next one */
     712      539980 :                                 *let = io->writer_list[i + 1];
     713             : 
     714             :                                 /* return the position */
     715      539980 :                                 *pos = i;
     716             : 
     717             :                                 /* on the first cycle, no one is waiting */
     718      539980 :                                 if (waiting_cycle == 0)
     719      539976 :                                         *waiting_mac = 0;
     720             : 
     721      539980 :                                 return;
     722             :                         }
     723             : 
     724             :                         /* next position to check */
     725           4 :                         let = &io->writer_list[i + 1];
     726             :                 }
     727             : 
     728             :                 /* if no worker is ready, wait for an event */
     729           4 :                 thread_cond_wait(&io->write_done, &io->io_mutex);
     730             : 
     731             :                 /* count the cycles */
     732           4 :                 ++waiting_cycle;
     733             :         }
     734             : }
     735             : 
     736     1070196 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
     737             : {
     738             :         /* if we reached the end */
     739     1070196 :         if (task->position >= worker->io->block_max) {
     740             :                 /* complete a dummy task */
     741         138 :                 task->state = TASK_STATE_EMPTY;
     742             :         } else {
     743     1070058 :                 worker->func(worker, task);
     744             :         }
     745     1070196 : }
     746             : 
     747         660 : static void* io_reader_thread(void* arg)
     748             : {
     749         660 :         struct snapraid_worker* worker = arg;
     750             : 
     751             :         /* force completion of the first task */
     752         660 :         io_reader_worker(worker, &worker->task_map[0]);
     753             : 
     754     1152634 :         while (1) {
     755             :                 struct snapraid_task* task;
     756             : 
     757             :                 /* get the new task */
     758     1153294 :                 task = io_reader_step(worker);
     759             : 
     760             :                 /* if no task, it means to exit */
     761     1153294 :                 if (!task)
     762         660 :                         break;
     763             : 
     764             :                 /* nothing more to do */
     765     1152634 :                 if (task->state == TASK_STATE_EMPTY)
     766       83098 :                         continue;
     767             : 
     768     1069536 :                 assert(task->state == TASK_STATE_READY);
     769             : 
     770             :                 /* work on the assigned task */
     771     1069536 :                 io_reader_worker(worker, task);
     772             :         }
     773             : 
     774         660 :         return 0;
     775             : }
     776             : 
     777         462 : static void* io_writer_thread(void* arg)
     778             : {
     779         462 :         struct snapraid_worker* worker = arg;
     780         462 :         int latest_state = TASK_STATE_DONE;
     781             : 
     782      539980 :         while (1) {
     783             :                 struct snapraid_task* task;
     784             : 
     785             :                 /* get the new task */
     786      540442 :                 task = io_writer_step(worker, latest_state);
     787             : 
     788             :                 /* if no task, it means to exit */
     789      540442 :                 if (!task)
     790         462 :                         break;
     791             : 
     792             :                 /* nothing more to do */
     793      539980 :                 if (task->state == TASK_STATE_EMPTY) {
     794        5910 :                         latest_state = TASK_STATE_DONE;
     795        5910 :                         continue;
     796             :                 }
     797             : 
     798      534070 :                 assert(task->state == TASK_STATE_READY);
     799             : 
     800             :                 /* work on the assigned task */
     801      534070 :                 worker->func(worker, task);
     802             : 
     803             :                 /* save the resulting state */
     804      534070 :                 latest_state = task->state;
     805             :         }
     806             : 
     807         462 :         return 0;
     808             : }
     809             : 
     810          96 : static void io_start_thread(struct snapraid_io* io,
     811             :         block_off_t blockstart, block_off_t blockmax,
     812             :         bit_vect_t* block_enabled)
     813             : {
     814             :         unsigned i;
     815             :         tommy_node* j;
     816             : 
     817             :         /* enable the filesystem mutex in all disks */
     818         670 :         for (j = io->state->disklist; j != 0; j = j->next) {
     819         574 :                 struct snapraid_disk* disk = j->data;
     820         574 :                 disk_start_thread(disk);
     821             :         }
     822             : 
     823          96 :         io->block_start = blockstart;
     824          96 :         io->block_max = blockmax;
     825          96 :         io->block_enabled = block_enabled;
     826          96 :         io->block_next = blockstart;
     827             : 
     828          96 :         io->done = 0;
     829          96 :         io->reader_index = io->io_max - 1; /* the first io_read_next() is going to set it to 0 */
     830          96 :         io->writer_index = 0;
     831             : 
     832             :         /* clear writer errors */
     833         480 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     834         384 :                 io->writer_error[i] = 0;
     835             : 
     836             :         /* setup the initial read pending tasks, except the latest one, */
     837             :         /* the latest will be initialized at the fist io_read_next() call */
     838       12288 :         for (i = 0; i < io->io_max - 1; ++i) {
     839       12192 :                 block_off_t blockcur = io_position_next(io);
     840             : 
     841       12192 :                 io_reader_sched(io, i, blockcur);
     842             :         }
     843             : 
     844             :         /* setup the lists of workers to process */
     845          96 :         io->reader_list[0] = io->reader_max;
     846         654 :         for (i = 0; i <= io->writer_max; ++i)
     847         558 :                 io->writer_list[i] = i;
     848             : 
     849             :         /* start the reader threads */
     850         756 :         for (i = 0; i < io->reader_max; ++i) {
     851         660 :                 struct snapraid_worker* worker = &io->reader_map[i];
     852             : 
     853         660 :                 worker->index = 0;
     854             : 
     855         660 :                 thread_create(&worker->thread, io_reader_thread, worker);
     856             :         }
     857             : 
     858             :         /* start the writer threads */
     859         558 :         for (i = 0; i < io->writer_max; ++i) {
     860         462 :                 struct snapraid_worker* worker = &io->writer_map[i];
     861             : 
     862         462 :                 worker->index = io->io_max - 1;
     863             : 
     864         462 :                 thread_create(&worker->thread, io_writer_thread, worker);
     865             :         }
     866          96 : }
     867             : 
     868          96 : static void io_stop_thread(struct snapraid_io* io)
     869             : {
     870             :         unsigned i;
     871             : 
     872          96 :         thread_mutex_lock(&io->io_mutex);
     873             : 
     874             :         /* mark that we are stopping */
     875          96 :         io->done = 1;
     876             : 
     877             :         /* signal all the threads to recognize the new state */
     878          96 :         thread_cond_broadcast(&io->read_sched);
     879          96 :         thread_cond_broadcast(&io->write_sched);
     880             : 
     881          96 :         thread_mutex_unlock(&io->io_mutex);
     882             : 
     883             :         /* wait for all readers to terminate */
     884         756 :         for (i = 0; i < io->reader_max; ++i) {
     885         660 :                 struct snapraid_worker* worker = &io->reader_map[i];
     886             :                 void* retval;
     887             : 
     888             :                 /* wait for thread termination */
     889         660 :                 thread_join(worker->thread, &retval);
     890             :         }
     891             : 
     892             :         /* wait for all writers to terminate */
     893         558 :         for (i = 0; i < io->writer_max; ++i) {
     894         462 :                 struct snapraid_worker* worker = &io->writer_map[i];
     895             :                 void* retval;
     896             : 
     897             :                 /* wait for thread termination */
     898         462 :                 thread_join(worker->thread, &retval);
     899             :         }
     900          96 : }
     901             : 
     902             : #endif
     903             : 
     904             : /*****************************************************************************/
     905             : /* global */
     906             : 
     907          97 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
     908             :         unsigned io_cache, unsigned buffer_max,
     909             :         void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
     910             :         struct snapraid_handle* handle_map, unsigned handle_max,
     911             :         void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
     912             :         void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
     913             :         struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
     914             : {
     915             :         unsigned i;
     916             :         size_t allocated_size;
     917          97 :         size_t block_size = state->block_size;
     918             : 
     919          97 :         io->state = state;
     920             : 
     921          97 :         assert(buffer_max >= handle_max + parity_handle_max);
     922             : 
     923             :         /* initialize bandwidth limiting */
     924          97 :         bw_init(&io->bw, state->opt.bwlimit);
     925             : 
     926             :         /* set IO context in handles */
     927         679 :         for (i = 0; i < handle_max; ++i)
     928         582 :                 handle_map[i].bw = &io->bw;
     929         644 :         for (i = 0; i < parity_handle_max; ++i)
     930         547 :                 parity_handle_map[i].bw = &io->bw;
     931             : 
     932             : #if HAVE_THREAD
     933          97 :         if (io_cache == 0) {
     934             :                 /* default is 16 MiB of cache */
     935             :                 /* this seems to be a good tradeoff between speed and memory usage */
     936          96 :                 io->io_max = 16 * 1024 * 1024 / state->block_size;
     937          96 :                 if (io->io_max < IO_MIN)
     938           0 :                         io->io_max = IO_MIN;
     939          96 :                 if (io->io_max > IO_MAX)
     940          96 :                         io->io_max = IO_MAX;
     941             :         } else {
     942           1 :                 io->io_max = io_cache;
     943             :         }
     944             : #else
     945             :         (void)io_cache;
     946             : 
     947             :         /* without pthread force the mono thread mode */
     948             :         io->io_max = 1;
     949             : #endif
     950             : 
     951          97 :         assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
     952             : 
     953          97 :         io->buffer_max = buffer_max;
     954          97 :         allocated_size = 0;
     955       12386 :         for (i = 0; i < io->io_max; ++i) {
     956       12289 :                 if (state->file_mode != ADVISE_DIRECT)
     957       12289 :                         io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
     958             :                 else
     959           0 :                         io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
     960       12289 :                 if (!state->opt.skip_self)
     961           0 :                         mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
     962       12289 :                 allocated_size += block_size * buffer_max;
     963             :         }
     964             : 
     965          97 :         msg_progress("Using %u MiB of memory for %u cached blocks.\n", (unsigned)(allocated_size / MEBI), io->io_max);
     966             : 
     967          97 :         if (parity_writer) {
     968          83 :                 io->reader_max = handle_max;
     969          83 :                 io->writer_max = parity_handle_max;
     970             :         } else {
     971          14 :                 io->reader_max = handle_max + parity_handle_max;
     972          14 :                 io->writer_max = 0;
     973             :         }
     974             : 
     975          97 :         io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
     976          97 :         io->reader_list = malloc_nofail(io->reader_max + 1);
     977          97 :         io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
     978          97 :         io->writer_list = malloc_nofail(io->writer_max + 1);
     979             : 
     980          97 :         io->data_base = 0;
     981          97 :         io->data_count = handle_max;
     982          97 :         io->parity_base = handle_max;
     983          97 :         io->parity_count = parity_handle_max;
     984             : 
     985         763 :         for (i = 0; i < io->reader_max; ++i) {
     986         666 :                 struct snapraid_worker* worker = &io->reader_map[i];
     987             : 
     988         666 :                 worker->io = io;
     989             : 
     990         666 :                 if (i < handle_max) {
     991             :                         /* it's a data read */
     992         582 :                         worker->handle = &handle_map[i];
     993         582 :                         worker->parity_handle = 0;
     994         582 :                         worker->func = data_reader;
     995             : 
     996             :                         /* data read is put in lower buffer index */
     997         582 :                         worker->buffer_skew = 0;
     998             :                 } else {
     999             :                         /* it's a parity read */
    1000          84 :                         worker->handle = 0;
    1001          84 :                         worker->parity_handle = &parity_handle_map[i - handle_max];
    1002          84 :                         worker->func = parity_reader;
    1003             : 
    1004             :                         /* parity read is put after data and computed parity */
    1005          84 :                         worker->buffer_skew = parity_handle_max;
    1006             :                 }
    1007             :         }
    1008             : 
    1009         560 :         for (i = 0; i < io->writer_max; ++i) {
    1010         463 :                 struct snapraid_worker* worker = &io->writer_map[i];
    1011             : 
    1012         463 :                 worker->io = io;
    1013             : 
    1014             :                 /* it's a parity write */
    1015         463 :                 worker->handle = 0;
    1016         463 :                 worker->parity_handle = &parity_handle_map[i];
    1017         463 :                 worker->func = parity_writer;
    1018             : 
    1019             :                 /* parity to write is put after data */
    1020         463 :                 worker->buffer_skew = handle_max;
    1021             :         }
    1022             : 
    1023             : #if HAVE_THREAD
    1024          97 :         if (io->io_max > 1) {
    1025          96 :                 io_read_next = io_read_next_thread;
    1026          96 :                 io_write_preset = io_write_preset_thread;
    1027          96 :                 io_write_next = io_write_next_thread;
    1028          96 :                 io_refresh = io_refresh_thread;
    1029          96 :                 io_flush = io_flush_thread;
    1030          96 :                 io_data_read = io_data_read_thread;
    1031          96 :                 io_parity_read = io_parity_read_thread;
    1032          96 :                 io_parity_write = io_parity_write_thread;
    1033          96 :                 io_start = io_start_thread;
    1034          96 :                 io_stop = io_stop_thread;
    1035             : 
    1036          96 :                 thread_mutex_init(&io->io_mutex);
    1037          96 :                 thread_cond_init(&io->read_done);
    1038          96 :                 thread_cond_init(&io->read_sched);
    1039          96 :                 thread_cond_init(&io->write_done);
    1040          96 :                 thread_cond_init(&io->write_sched);
    1041             :         } else
    1042             : #endif
    1043             :         {
    1044           1 :                 io_read_next = io_read_next_mono;
    1045           1 :                 io_write_preset = io_write_preset_mono;
    1046           1 :                 io_write_next = io_write_next_mono;
    1047           1 :                 io_refresh = io_refresh_mono;
    1048           1 :                 io_flush = io_flush_mono;
    1049           1 :                 io_data_read = io_data_read_mono;
    1050           1 :                 io_parity_read = io_parity_read_mono;
    1051           1 :                 io_parity_write = io_parity_write_mono;
    1052           1 :                 io_start = io_start_mono;
    1053           1 :                 io_stop = io_stop_mono;
    1054             :         }
    1055          97 : }
    1056             : 
    1057          97 : void io_done(struct snapraid_io* io)
    1058             : {
    1059             :         unsigned i;
    1060             : 
    1061       12386 :         for (i = 0; i < io->io_max; ++i) {
    1062       12289 :                 free(io->buffer_map[i]);
    1063       12289 :                 free(io->buffer_alloc_map[i]);
    1064             :         }
    1065             : 
    1066          97 :         free(io->reader_map);
    1067          97 :         free(io->reader_list);
    1068          97 :         free(io->writer_map);
    1069          97 :         free(io->writer_list);
    1070             : 
    1071             : #if HAVE_THREAD
    1072          97 :         if (io->io_max > 1) {
    1073          96 :                 thread_mutex_destroy(&io->io_mutex);
    1074          96 :                 thread_cond_destroy(&io->read_done);
    1075          96 :                 thread_cond_destroy(&io->read_sched);
    1076          96 :                 thread_cond_destroy(&io->write_done);
    1077          96 :                 thread_cond_destroy(&io->write_sched);
    1078             :         }
    1079             : #endif
    1080          97 : }
    1081             : 

Generated by: LCOV version 1.0