LCOV - code coverage report
Current view: top level - cmdline - io.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 413 443 93.2 %
Date: 2026-03-01 15:35:05 Functions: 30 33 90.9 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (C) 2016 Andrea Mazzoleni
       3             :  *
       4             :  * This program is free software: you can redistribute it and/or modify
       5             :  * it under the terms of the GNU General Public License as published by
       6             :  * the Free Software Foundation, either version 3 of the License, or
       7             :  * (at your option) any later version.
       8             :  *
       9             :  * This program is distributed in the hope that it will be useful,
      10             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  * GNU General Public License for more details.
      13             :  *
      14             :  * You should have received a copy of the GNU General Public License
      15             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "portable.h"
      19             : 
      20             : #include "io.h"
      21             : 
      22             : void (*io_start)(struct snapraid_io* io,
      23             :         block_off_t blockstart, block_off_t blockmax,
      24             :         bit_vect_t* block_enabled) = 0;
      25             : void (*io_stop)(struct snapraid_io* io) = 0;
      26             : block_off_t (*io_read_next)(struct snapraid_io* io, void*** buffer) = 0;
      27             : struct snapraid_task* (*io_data_read)(struct snapraid_io* io, unsigned* diskcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      28             : struct snapraid_task* (*io_parity_read)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      29             : void (*io_parity_write)(struct snapraid_io* io, unsigned* levcur, unsigned* waiting_map, unsigned* waiting_mac) = 0;
      30             : void (*io_write_preset)(struct snapraid_io* io, block_off_t blockcur, int skip) = 0;
      31             : void (*io_write_next)(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error) = 0;
      32             : void (*io_refresh)(struct snapraid_io* io) = 0;
      33             : void (*io_flush)(struct snapraid_io* io) = 0;
      34             : 
      35             : /**
      36             :  * Get the next block position to process.
      37             :  */
      38      174856 : static block_off_t io_position_next(struct snapraid_io* io)
      39             : {
      40             :         block_off_t blockcur;
      41             : 
      42             :         /* get the next position */
      43      174856 :         if (io->block_enabled) {
      44      532189 :                 while (io->block_next < io->block_max && !bit_vect_test(io->block_enabled, io->block_next))
      45      367234 :                         ++io->block_next;
      46             :         }
      47             : 
      48      174856 :         blockcur = io->block_next;
      49             : 
      50             :         /* next block for the next call */
      51      174856 :         ++io->block_next;
      52             : 
      53      174856 :         return blockcur;
      54             : }
      55             : 
      56             : /**
      57             :  * Setup the next pending task for all readers.
      58             :  */
      59      174856 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      60             : {
      61             :         unsigned i;
      62             : 
      63     1446628 :         for (i = 0; i < io->reader_max; ++i) {
      64     1271772 :                 struct snapraid_worker* worker = &io->reader_map[i];
      65     1271772 :                 struct snapraid_task* task = &worker->task_map[task_index];
      66             : 
      67             :                 /* setup the new pending task */
      68     1271772 :                 if (blockcur < io->block_max)
      69     1181136 :                         task->state = TASK_STATE_READY;
      70             :                 else
      71       90636 :                         task->state = TASK_STATE_EMPTY;
      72             : 
      73     1271772 :                 task->path[0] = 0;
      74     1271772 :                 if (worker->handle)
      75     1049136 :                         task->disk = worker->handle->disk;
      76             :                 else
      77      222636 :                         task->disk = 0;
      78             : 
      79     1271772 :                 assert(worker->buffer_skew + i < io->buffer_max);
      80             : 
      81     1271772 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
      82     1271772 :                 task->position = blockcur;
      83     1271772 :                 task->block = 0;
      84     1271772 :                 task->file = 0;
      85     1271772 :                 task->file_pos = 0;
      86     1271772 :                 task->read_size = 0;
      87     1271772 :                 task->is_timestamp_different = 0;
      88             :         }
      89      174856 : }
      90             : 
      91             : /**
      92             :  * Setup the next pending task for all writers.
      93             :  */
      94      125307 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      95             : {
      96             :         unsigned i;
      97             : 
      98      699969 :         for (i = 0; i < io->writer_max; ++i) {
      99      574662 :                 struct snapraid_worker* worker = &io->writer_map[i];
     100      574662 :                 struct snapraid_task* task = &worker->task_map[task_index];
     101             : 
     102             :                 /* setup the new pending task */
     103      574662 :                 task->state = TASK_STATE_READY;
     104      574662 :                 task->path[0] = 0;
     105      574662 :                 task->disk = 0;
     106      574662 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
     107      574662 :                 task->position = blockcur;
     108      574662 :                 task->block = 0;
     109      574662 :                 task->file = 0;
     110      574662 :                 task->file_pos = 0;
     111      574662 :                 task->read_size = 0;
     112      574662 :                 task->is_timestamp_different = 0;
     113             :         }
     114      125307 : }
     115             : 
     116             : /**
     117             :  * Setup an empty next pending task for all writers.
     118             :  */
     119         921 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
     120             : {
     121             :         unsigned i;
     122             : 
     123        6447 :         for (i = 0; i < io->writer_max; ++i) {
     124        5526 :                 struct snapraid_worker* worker = &io->writer_map[i];
     125        5526 :                 struct snapraid_task* task = &worker->task_map[task_index];
     126             : 
     127             :                 /* setup the new pending task */
     128        5526 :                 task->state = TASK_STATE_EMPTY;
     129        5526 :                 task->path[0] = 0;
     130        5526 :                 task->disk = 0;
     131        5526 :                 task->buffer = 0;
     132        5526 :                 task->position = blockcur;
     133        5526 :                 task->block = 0;
     134        5526 :                 task->file = 0;
     135        5526 :                 task->file_pos = 0;
     136        5526 :                 task->read_size = 0;
     137        5526 :                 task->is_timestamp_different = 0;
     138             :         }
     139         921 : }
     140             : 
     141             : /*****************************************************************************/
     142             : /* mono thread */
     143             : 
     144        9376 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
     145             : {
     146             :         block_off_t blockcur_schedule;
     147             : 
     148             :         /* reset the index */
     149        9376 :         io->reader_index = 0;
     150             : 
     151        9376 :         blockcur_schedule = io_position_next(io);
     152             : 
     153             :         /* schedule the next read */
     154        9376 :         io_reader_sched(io, 0, blockcur_schedule);
     155             : 
     156             :         /* set the buffer to use */
     157        9376 :         *buffer = io->buffer_map[0];
     158             : 
     159        9376 :         return blockcur_schedule;
     160             : }
     161             : 
     162        9374 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
     163             : {
     164             :         unsigned i;
     165             : 
     166             :         /* reset the index */
     167        9374 :         io->writer_index = 0;
     168             : 
     169             :         /* clear errors */
     170       46870 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     171       37496 :                 io->writer_error[i] = 0;
     172             : 
     173        9374 :         if (skip) {
     174             :                 /* skip the next write */
     175           0 :                 io_writer_sched_empty(io, 0, blockcur);
     176             :         } else {
     177             :                 /* schedule the next write */
     178        9374 :                 io_writer_sched(io, 0, blockcur);
     179             :         }
     180        9374 : }
     181             : 
     182        9374 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     183             : {
     184             :         unsigned i;
     185             : 
     186             :         (void)blockcur;
     187             :         (void)skip;
     188             : 
     189             :         /* report errors */
     190       46870 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     191       37496 :                 writer_error[i] = io->writer_error[i];
     192        9374 : }
     193             : 
     194           0 : static void io_refresh_mono(struct snapraid_io* io)
     195             : {
     196             :         (void)io;
     197           0 : }
     198             : 
     199           2 : static void io_flush_mono(struct snapraid_io* io)
     200             : {
     201             :         (void)io;
     202           2 : }
     203             : 
     204       56244 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     205             : {
     206             :         struct snapraid_worker* worker;
     207             :         struct snapraid_task* task;
     208             :         unsigned i;
     209             : 
     210             :         /* get the next task */
     211       56244 :         i = io->reader_index++;
     212             : 
     213       56244 :         assert(base <= i && i < base + count);
     214             : 
     215       56244 :         worker = &io->reader_map[i];
     216       56244 :         task = &worker->task_map[0];
     217             : 
     218             :         /* do the work */
     219       56244 :         if (task->state != TASK_STATE_EMPTY)
     220       56244 :                 worker->func(worker, task);
     221             : 
     222             :         /* return the position */
     223       56244 :         *pos = i - base;
     224             : 
     225             :         /* store the waiting index */
     226       56244 :         waiting_map[0] = i - base;
     227       56244 :         *waiting_mac = 1;
     228             : 
     229       56244 :         return task;
     230             : }
     231             : 
     232       56244 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     233             : {
     234       56244 :         return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     235             : }
     236             : 
     237           0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     238             : {
     239           0 :         return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     240             : }
     241             : 
     242        9374 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     243             : {
     244             :         struct snapraid_worker* worker;
     245             :         struct snapraid_task* task;
     246             :         unsigned i;
     247             : 
     248             :         /* get the next task */
     249        9374 :         i = io->writer_index++;
     250             : 
     251        9374 :         worker = &io->writer_map[i];
     252        9374 :         task = &worker->task_map[0];
     253             : 
     254        9374 :         io->writer_error[i] = 0;
     255             : 
     256             :         /* do the work */
     257        9374 :         if (task->state != TASK_STATE_EMPTY)
     258        9374 :                 worker->func(worker, task);
     259             : 
     260             :         /* return the position */
     261        9374 :         *pos = i;
     262             : 
     263             :         /* store the waiting index */
     264        9374 :         waiting_map[0] = i;
     265        9374 :         *waiting_mac = 1;
     266        9374 : }
     267             : 
     268           2 : static void io_start_mono(struct snapraid_io* io,
     269             :         block_off_t blockstart, block_off_t blockmax,
     270             :         bit_vect_t* block_enabled)
     271             : {
     272           2 :         io->block_start = blockstart;
     273           2 :         io->block_max = blockmax;
     274           2 :         io->block_enabled = block_enabled;
     275           2 :         io->block_next = blockstart;
     276           2 : }
     277             : 
     278           2 : static void io_stop_mono(struct snapraid_io* io)
     279             : {
     280             :         (void)io;
     281           2 : }
     282             : 
     283             : /*****************************************************************************/
     284             : /* multi thread */
     285             : 
     286             : /* disable multithread if pthread is not present */
     287             : #if HAVE_THREAD
     288             : 
     289             : /**
     290             :  * Get the next task to work on for a reader.
     291             :  *
     292             :  * This is the synchronization point for workers with the io.
     293             :  */
     294     1215240 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
     295             : {
     296     1215240 :         struct snapraid_io* io = worker->io;
     297             : 
     298             :         /* the synchronization is protected by the io mutex */
     299     1215240 :         thread_mutex_lock(&io->io_mutex);
     300             : 
     301     1007641 :         while (1) {
     302             :                 unsigned next_index;
     303             : 
     304             :                 /* check if the worker has to exit */
     305             :                 /* even if there is work to do */
     306     2222881 :                 if (io->done) {
     307         708 :                         thread_mutex_unlock(&io->io_mutex);
     308         708 :                         return 0;
     309             :                 }
     310             : 
     311             :                 /* get the next pending task */
     312     2222173 :                 next_index = (worker->index + 1) % io->io_max;
     313             : 
     314             :                 /* if the queue of pending tasks is not empty */
     315     2222173 :                 if (next_index != io->reader_index) {
     316             :                         struct snapraid_task* task;
     317             : 
     318             :                         /* the index that the IO may be waiting for */
     319     1214532 :                         unsigned waiting_index = io->reader_index;
     320             : 
     321             :                         /* the index that worker just completed */
     322     1214532 :                         unsigned done_index = worker->index;
     323             : 
     324             :                         /* get the new working task */
     325     1214532 :                         worker->index = next_index;
     326     1214532 :                         task = &worker->task_map[worker->index];
     327             : 
     328             :                         /* if the just completed task is at this index */
     329     1214532 :                         if (done_index == waiting_index) {
     330             :                                 /* notify the IO that a new read is complete */
     331         845 :                                 thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
     332             :                         } else {
     333     1213687 :                                 thread_mutex_unlock(&io->io_mutex);
     334             :                         }
     335             : 
     336             :                         /* return the new task */
     337     1214532 :                         return task;
     338             :                 }
     339             : 
     340             :                 /* otherwise wait for a read_sched event */
     341     1007641 :                 thread_cond_wait(&io->read_sched, &io->io_mutex);
     342             :         }
     343             : }
     344             : 
     345             : /**
     346             :  * Get the next task to work on for a writer.
     347             :  *
     348             :  * This is the synchronization point for workers with the io.
     349             :  */
     350      571319 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
     351             : {
     352      571319 :         struct snapraid_io* io = worker->io;
     353             :         int error_index;
     354             : 
     355             :         /* the synchronization is protected by the io mutex */
     356      571319 :         thread_mutex_lock(&io->io_mutex);
     357             : 
     358             :         /* counts the number of errors in the global state */
     359      571319 :         error_index = state - IO_WRITER_ERROR_BASE;
     360      571319 :         if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
     361           0 :                 ++io->writer_error[error_index];
     362             : 
     363      550697 :         while (1) {
     364             :                 unsigned next_index;
     365             : 
     366             :                 /* get the next pending task */
     367     1122016 :                 next_index = (worker->index + 1) % io->io_max;
     368             : 
     369             :                 /* if the queue of pending tasks is not empty */
     370     1122016 :                 if (next_index != io->writer_index) {
     371             :                         struct snapraid_task* task;
     372             : 
     373             :                         /* the index that the IO may be waiting for */
     374      570814 :                         unsigned waiting_index = (io->writer_index + 1) % io->io_max;
     375             : 
     376             :                         /* the index that worker just completed */
     377      570814 :                         unsigned done_index = worker->index;
     378             : 
     379             :                         /* get the new working task */
     380      570814 :                         worker->index = next_index;
     381      570814 :                         task = &worker->task_map[worker->index];
     382             : 
     383             :                         /* if the just completed task is at this index */
     384      570814 :                         if (done_index == waiting_index) {
     385             :                                 /* notify the IO that a new write is complete */
     386        4377 :                                 thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
     387             :                         } else {
     388      566437 :                                 thread_mutex_unlock(&io->io_mutex);
     389             :                         }
     390             : 
     391             :                         /* return the new task */
     392      570814 :                         return task;
     393             :                 }
     394             : 
     395             :                 /* check if the worker has to exit */
     396             :                 /* but only if there is no work to do */
     397      551202 :                 if (io->done) {
     398         505 :                         thread_mutex_unlock(&io->io_mutex);
     399         505 :                         return 0;
     400             :                 }
     401             : 
     402             :                 /* otherwise wait for a write_sched event */
     403      550697 :                 thread_cond_wait(&io->write_sched, &io->io_mutex);
     404             :         }
     405             : }
     406             : 
     407             : /**
     408             :  * Get the next block position to operate on.
     409             :  *
     410             :  * This is the synchronization point for workers with the io.
     411             :  */
     412      152272 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
     413             : {
     414             :         block_off_t blockcur_schedule;
     415             :         block_off_t blockcur_caller;
     416             :         unsigned i;
     417             : 
     418             :         /* get the next parity position to process */
     419      152272 :         blockcur_schedule = io_position_next(io);
     420             : 
     421             :         /* ensure that all data/parity was read */
     422      152272 :         assert(io->reader_list[0] == io->reader_max);
     423             : 
     424             :         /* setup the list of workers to process */
     425     1430144 :         for (i = 0; i <= io->reader_max; ++i)
     426     1277872 :                 io->reader_list[i] = i;
     427             : 
     428             :         /* the synchronization is protected by the io mutex */
     429      152272 :         thread_mutex_lock(&io->io_mutex);
     430             : 
     431             :         /* schedule the next read */
     432      152272 :         io_reader_sched(io, io->reader_index, blockcur_schedule);
     433             : 
     434             :         /* set the index for the tasks to return to the caller */
     435      152272 :         io->reader_index = (io->reader_index + 1) % io->io_max;
     436             : 
     437             :         /* get the position to operate at high level from one task */
     438      152272 :         blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
     439             : 
     440             :         /* set the buffer to use */
     441      152272 :         *buffer = io->buffer_map[io->reader_index];
     442             : 
     443             :         /* signal all the workers that there is a new pending task */
     444      152272 :         thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
     445             : 
     446      152272 :         return blockcur_caller;
     447             : }
     448             : 
     449      116854 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
     450             : {
     451             :         (void)io;
     452             :         (void)blockcur;
     453             :         (void)skip;
     454      116854 : }
     455             : 
     456      116854 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     457             : {
     458             :         unsigned i;
     459             : 
     460             :         /* ensure that all parity was written */
     461      116854 :         assert(io->writer_list[0] == io->writer_max);
     462             : 
     463             :         /* setup the list of workers to process */
     464      804522 :         for (i = 0; i <= io->writer_max; ++i)
     465      687668 :                 io->writer_list[i] = i;
     466             : 
     467             :         /* the synchronization is protected by the io mutex */
     468      116854 :         thread_mutex_lock(&io->io_mutex);
     469             : 
     470             :         /* report errors */
     471      584270 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
     472      467416 :                 writer_error[i] = io->writer_error[i];
     473      467416 :                 io->writer_error[i] = 0;
     474             :         }
     475             : 
     476      116854 :         if (skip) {
     477             :                 /* skip the next write */
     478         921 :                 io_writer_sched_empty(io, io->writer_index, blockcur);
     479             :         } else {
     480             :                 /* schedule the next write */
     481      115933 :                 io_writer_sched(io, io->writer_index, blockcur);
     482             :         }
     483             : 
     484             :         /* at this point the writers must be in sync with the readers */
     485      116854 :         assert(io->writer_index == io->reader_index);
     486             : 
     487             :         /* set the index to be used for the next write */
     488      116854 :         io->writer_index = (io->writer_index + 1) % io->io_max;
     489             : 
     490             :         /* signal all the workers that there is a new pending task */
     491      116854 :         thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
     492      116854 : }
     493             : 
     494           0 : static void io_refresh_thread(struct snapraid_io* io)
     495             : {
     496             :         unsigned i;
     497             : 
     498             :         /* the synchronization is protected by the io mutex */
     499           0 :         thread_mutex_lock(&io->io_mutex);
     500             : 
     501             :         /* for all readers, count the number of read blocks */
     502           0 :         for (i = 0; i < io->reader_max; ++i) {
     503             :                 unsigned begin, end, cached;
     504           0 :                 struct snapraid_worker* worker = &io->reader_map[i];
     505             : 
     506             :                 /* the first block read */
     507           0 :                 begin = io->reader_index + 1;
     508             :                 /* the block in reading */
     509           0 :                 end = worker->index;
     510           0 :                 if (begin > end)
     511           0 :                         end += io->io_max;
     512           0 :                 cached = end - begin;
     513             : 
     514           0 :                 if (worker->parity_handle)
     515           0 :                         io->state->parity[worker->parity_handle->level].cached_blocks = cached;
     516             :                 else
     517           0 :                         worker->handle->disk->cached_blocks = cached;
     518             :         }
     519             : 
     520             :         /* for all writers, count the number of written blocks */
     521             :         /* note that this is a kind of "opposite" of cached blocks */
     522           0 :         for (i = 0; i < io->writer_max; ++i) {
     523             :                 unsigned begin, end, cached;
     524           0 :                 struct snapraid_worker* worker = &io->writer_map[i];
     525             : 
     526             :                 /* the first block written */
     527           0 :                 begin = io->writer_index + 1;
     528             :                 /* the block in writing */
     529           0 :                 end = worker->index;
     530           0 :                 if (begin > end)
     531           0 :                         end += io->io_max;
     532           0 :                 cached = end - begin;
     533             : 
     534           0 :                 io->state->parity[worker->parity_handle->level].cached_blocks = cached;
     535             :         }
     536             : 
     537           0 :         thread_mutex_unlock(&io->io_mutex);
     538           0 : }
     539             : 
     540      148094 : static void io_flush_thread(struct snapraid_io* io)
     541             : {
     542             :         unsigned i;
     543             : 
     544      148003 :         while (1) {
     545      148094 :                 int all_done = 1;
     546             : 
     547             :                 /* the synchronization is protected by the io mutex */
     548      148094 :                 thread_mutex_lock(&io->io_mutex);
     549             : 
     550      196697 :                 for (i = 0; i < io->writer_max; ++i) {
     551      196606 :                         struct snapraid_worker* worker = &io->writer_map[i];
     552             : 
     553             :                         /* get the next pending task */
     554      196606 :                         unsigned next_index = (worker->index + 1) % io->io_max;
     555             : 
     556             :                         /* if the queue of pending tasks is not empty */
     557      196606 :                         if (next_index != io->writer_index) {
     558      148003 :                                 all_done = 0;
     559      148003 :                                 break;
     560             :                         }
     561             :                 }
     562             : 
     563      148094 :                 thread_mutex_unlock(&io->io_mutex);
     564             : 
     565      148094 :                 if (all_done)
     566          91 :                         break;
     567             : 
     568             :                 /* wait for something to complete */
     569      148003 :                 thread_yield();
     570             :         }
     571          91 : }
     572             : 
     573     1124892 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     574             : {
     575             :         unsigned waiting_cycle;
     576             : 
     577             :         /* count the waiting cycle */
     578     1124892 :         waiting_cycle = 0;
     579             : 
     580             :         /* clear the waiting indexes */
     581     1124892 :         *waiting_mac = 0;
     582             : 
     583             :         /* the synchronization is protected by the io mutex */
     584     1124892 :         thread_mutex_lock(&io->io_mutex);
     585             : 
     586         766 :         while (1) {
     587             :                 unsigned char* let;
     588             :                 unsigned busy_index;
     589             : 
     590             :                 /* get the index the IO is using */
     591             :                 /* we must ensure that this index has not a read in progress */
     592             :                 /* to avoid a concurrent access */
     593     1125658 :                 busy_index = io->reader_index;
     594             : 
     595             :                 /* search for a worker that has already finished */
     596     1125658 :                 let = &io->reader_list[0];
     597        3415 :                 while (1) {
     598     1129073 :                         unsigned i = *let;
     599             : 
     600             :                         /* if we are at the end */
     601     1129073 :                         if (i == io->reader_max)
     602         766 :                                 break;
     603             : 
     604             :                         /* if it's in range */
     605     1128307 :                         if (base <= i && i < base + count) {
     606             :                                 struct snapraid_worker* worker;
     607             : 
     608             :                                 /* if it's the first cycle */
     609     1126993 :                                 if (waiting_cycle == 0) {
     610             :                                         /* store the waiting indexes */
     611     1126166 :                                         waiting_map[(*waiting_mac)++] = i - base;
     612             :                                 }
     613             : 
     614     1126993 :                                 worker = &io->reader_map[i];
     615             : 
     616             :                                 /* if the worker has finished this index */
     617     1126993 :                                 if (busy_index != worker->index) {
     618             :                                         struct snapraid_task* task;
     619             : 
     620     1124892 :                                         task = &worker->task_map[io->reader_index];
     621             : 
     622     1124892 :                                         thread_mutex_unlock(&io->io_mutex);
     623             : 
     624             :                                         /* mark the worker as processed */
     625             :                                         /* setting the previous one to point at the next one */
     626     1124892 :                                         *let = io->reader_list[i + 1];
     627             : 
     628             :                                         /* return the position */
     629     1124892 :                                         *pos = i - base;
     630             : 
     631             :                                         /* on the first cycle, no one is waiting */
     632     1124892 :                                         if (waiting_cycle == 0)
     633     1124126 :                                                 *waiting_mac = 0;
     634             : 
     635     1124892 :                                         return task;
     636             :                                 }
     637             :                         }
     638             : 
     639             :                         /* next position to check */
     640        3415 :                         let = &io->reader_list[i + 1];
     641             :                 }
     642             : 
     643             :                 /* if no worker is ready, wait for an event */
     644         766 :                 thread_cond_wait(&io->read_done, &io->io_mutex);
     645             : 
     646             :                 /* count the cycles */
     647         766 :                 ++waiting_cycle;
     648             :         }
     649             : }
     650             : 
     651      913008 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     652             : {
     653      913008 :         return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     654             : }
     655             : 
     656      211884 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     657             : {
     658      211884 :         return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     659             : }
     660             : 
     661      570814 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     662             : {
     663             :         unsigned waiting_cycle;
     664             : 
     665             :         /* count the waiting cycle */
     666      570814 :         waiting_cycle = 0;
     667             : 
     668             :         /* clear the waiting indexes */
     669      570814 :         *waiting_mac = 0;
     670             : 
     671             :         /* the synchronization is protected by the io mutex */
     672      570814 :         thread_mutex_lock(&io->io_mutex);
     673             : 
     674        4367 :         while (1) {
     675             :                 unsigned char* let;
     676             :                 unsigned busy_index;
     677             : 
     678             :                 /* get the next index the IO is going to use */
     679             :                 /* we must ensure that this index has not a write in progress */
     680             :                 /* to avoid a concurrent access */
     681             :                 /* note that we are already sure that a write is not in progress */
     682             :                 /* at the index the IO is using at now */
     683      575181 :                 busy_index = (io->writer_index + 1) % io->io_max;
     684             : 
     685             :                 /* search for a worker that has already finished */
     686      575181 :                 let = &io->writer_list[0];
     687        4367 :                 while (1) {
     688      579548 :                         unsigned i = *let;
     689             :                         struct snapraid_worker* worker;
     690             : 
     691             :                         /* if we are at the end */
     692      579548 :                         if (i == io->writer_max)
     693        4367 :                                 break;
     694             : 
     695             :                         /* if it's the first cycle */
     696      575181 :                         if (waiting_cycle == 0) {
     697             :                                 /* store the waiting indexes */
     698      570814 :                                 waiting_map[(*waiting_mac)++] = i;
     699             :                         }
     700             : 
     701      575181 :                         worker = &io->writer_map[i];
     702             : 
     703             :                         /* the two indexes cannot be equal */
     704      575181 :                         assert(io->writer_index != worker->index);
     705             : 
     706             :                         /* if the worker has finished this index */
     707      575181 :                         if (busy_index != worker->index) {
     708      570814 :                                 thread_mutex_unlock(&io->io_mutex);
     709             : 
     710             :                                 /* mark the worker as processed */
     711             :                                 /* setting the previous one to point at the next one */
     712      570814 :                                 *let = io->writer_list[i + 1];
     713             : 
     714             :                                 /* return the position */
     715      570814 :                                 *pos = i;
     716             : 
     717             :                                 /* on the first cycle, no one is waiting */
     718      570814 :                                 if (waiting_cycle == 0)
     719      566447 :                                         *waiting_mac = 0;
     720             : 
     721      570814 :                                 return;
     722             :                         }
     723             : 
     724             :                         /* next position to check */
     725        4367 :                         let = &io->writer_list[i + 1];
     726             :                 }
     727             : 
     728             :                 /* if no worker is ready, wait for an event */
     729        4367 :                 thread_cond_wait(&io->write_done, &io->io_mutex);
     730             : 
     731             :                 /* count the cycles */
     732        4367 :                 ++waiting_cycle;
     733             :         }
     734             : }
     735             : 
     736     1125030 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
     737             : {
     738             :         /* if we reached the end */
     739     1125030 :         if (task->position >= worker->io->block_max) {
     740             :                 /* complete a dummy task */
     741         138 :                 task->state = TASK_STATE_EMPTY;
     742             :         } else {
     743     1124892 :                 worker->func(worker, task);
     744             :         }
     745     1125030 : }
     746             : 
     747         708 : static void* io_reader_thread(void* arg)
     748             : {
     749         708 :         struct snapraid_worker* worker = arg;
     750             : 
     751             :         /* force completion of the first task */
     752         708 :         io_reader_worker(worker, &worker->task_map[0]);
     753             : 
     754     1214532 :         while (1) {
     755             :                 struct snapraid_task* task;
     756             : 
     757             :                 /* get the new task */
     758     1215240 :                 task = io_reader_step(worker);
     759             : 
     760             :                 /* if no task, it means to exit */
     761     1215240 :                 if (!task)
     762         708 :                         break;
     763             : 
     764             :                 /* nothing more to do */
     765     1214532 :                 if (task->state == TASK_STATE_EMPTY)
     766       90210 :                         continue;
     767             : 
     768     1124322 :                 assert(task->state == TASK_STATE_READY);
     769             : 
     770             :                 /* work on the assigned task */
     771     1124322 :                 io_reader_worker(worker, task);
     772             :         }
     773             : 
     774         708 :         return 0;
     775             : }
     776             : 
     777         505 : static void* io_writer_thread(void* arg)
     778             : {
     779         505 :         struct snapraid_worker* worker = arg;
     780         505 :         int latest_state = TASK_STATE_DONE;
     781             : 
     782      570814 :         while (1) {
     783             :                 struct snapraid_task* task;
     784             : 
     785             :                 /* get the new task */
     786      571319 :                 task = io_writer_step(worker, latest_state);
     787             : 
     788             :                 /* if no task, it means to exit */
     789      571319 :                 if (!task)
     790         505 :                         break;
     791             : 
     792             :                 /* nothing more to do */
     793      570814 :                 if (task->state == TASK_STATE_EMPTY) {
     794        5526 :                         latest_state = TASK_STATE_DONE;
     795        5526 :                         continue;
     796             :                 }
     797             : 
     798      565288 :                 assert(task->state == TASK_STATE_READY);
     799             : 
     800             :                 /* work on the assigned task */
     801      565288 :                 worker->func(worker, task);
     802             : 
     803             :                 /* save the resulting state */
     804      565288 :                 latest_state = task->state;
     805             :         }
     806             : 
     807         505 :         return 0;
     808             : }
     809             : 
     810         104 : static void io_start_thread(struct snapraid_io* io,
     811             :         block_off_t blockstart, block_off_t blockmax,
     812             :         bit_vect_t* block_enabled)
     813             : {
     814             :         unsigned i;
     815             : 
     816         104 :         io->block_start = blockstart;
     817         104 :         io->block_max = blockmax;
     818         104 :         io->block_enabled = block_enabled;
     819         104 :         io->block_next = blockstart;
     820             : 
     821         104 :         io->done = 0;
     822         104 :         io->reader_index = io->io_max - 1; /* the first io_read_next() is going to set it to 0 */
     823         104 :         io->writer_index = 0;
     824             : 
     825             :         /* clear writer errors */
     826         520 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     827         416 :                 io->writer_error[i] = 0;
     828             : 
     829             :         /* setup the initial read pending tasks, except the latest one, */
     830             :         /* the latest will be initialized at the fist io_read_next() call */
     831       13312 :         for (i = 0; i < io->io_max - 1; ++i) {
     832       13208 :                 block_off_t blockcur = io_position_next(io);
     833             : 
     834       13208 :                 io_reader_sched(io, i, blockcur);
     835             :         }
     836             : 
     837             :         /* setup the lists of workers to process */
     838         104 :         io->reader_list[0] = io->reader_max;
     839         713 :         for (i = 0; i <= io->writer_max; ++i)
     840         609 :                 io->writer_list[i] = i;
     841             : 
     842             :         /* start the reader threads */
     843         812 :         for (i = 0; i < io->reader_max; ++i) {
     844         708 :                 struct snapraid_worker* worker = &io->reader_map[i];
     845             : 
     846         708 :                 worker->index = 0;
     847             : 
     848         708 :                 thread_create(&worker->thread, io_reader_thread, worker);
     849             :         }
     850             : 
     851             :         /* start the writer threads */
     852         609 :         for (i = 0; i < io->writer_max; ++i) {
     853         505 :                 struct snapraid_worker* worker = &io->writer_map[i];
     854             : 
     855         505 :                 worker->index = io->io_max - 1;
     856             : 
     857         505 :                 thread_create(&worker->thread, io_writer_thread, worker);
     858             :         }
     859         104 : }
     860             : 
     861         104 : static void io_stop_thread(struct snapraid_io* io)
     862             : {
     863             :         unsigned i;
     864             : 
     865         104 :         thread_mutex_lock(&io->io_mutex);
     866             : 
     867             :         /* mark that we are stopping */
     868         104 :         io->done = 1;
     869             : 
     870             :         /* signal all the threads to recognize the new state */
     871         104 :         thread_cond_broadcast(&io->read_sched);
     872         104 :         thread_cond_broadcast(&io->write_sched);
     873             : 
     874         104 :         thread_mutex_unlock(&io->io_mutex);
     875             : 
     876             :         /* wait for all readers to terminate */
     877         812 :         for (i = 0; i < io->reader_max; ++i) {
     878         708 :                 struct snapraid_worker* worker = &io->reader_map[i];
     879             :                 void* retval;
     880             : 
     881             :                 /* wait for thread termination */
     882         708 :                 thread_join(worker->thread, &retval);
     883             :         }
     884             : 
     885             :         /* wait for all writers to terminate */
     886         609 :         for (i = 0; i < io->writer_max; ++i) {
     887         505 :                 struct snapraid_worker* worker = &io->writer_map[i];
     888             :                 void* retval;
     889             : 
     890             :                 /* wait for thread termination */
     891         505 :                 thread_join(worker->thread, &retval);
     892             :         }
     893         104 : }
     894             : 
     895             : #endif
     896             : 
     897             : /*****************************************************************************/
     898             : /* global */
     899             : 
     900         106 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
     901             :         unsigned io_cache, unsigned buffer_max,
     902             :         void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
     903             :         struct snapraid_handle* handle_map, unsigned handle_max,
     904             :         void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
     905             :         void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
     906             :         struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
     907             : {
     908             :         unsigned i;
     909             :         size_t allocated_size;
     910         106 :         size_t block_size = state->block_size;
     911             : 
     912         106 :         io->state = state;
     913             : 
     914         106 :         assert(buffer_max >= handle_max + parity_handle_max);
     915             : 
     916             :         /* initialize bandwidth limiting */
     917         106 :         bw_init(&io->bw, state->opt.bwlimit);
     918             : 
     919             :         /* set IO context in handles */
     920         742 :         for (i = 0; i < handle_max; ++i)
     921         636 :                 handle_map[i].bw = &io->bw;
     922         697 :         for (i = 0; i < parity_handle_max; ++i)
     923         591 :                 parity_handle_map[i].bw = &io->bw;
     924             : 
     925             : #if HAVE_THREAD
     926         106 :         if (io_cache == 0) {
     927             :                 /* default is 16 MiB of cache */
     928             :                 /* this seems to be a good tradeoff between speed and memory usage */
     929         104 :                 io->io_max = 16 * 1024 * 1024 / state->block_size;
     930         104 :                 if (io->io_max < IO_MIN)
     931           0 :                         io->io_max = IO_MIN;
     932         104 :                 if (io->io_max > IO_MAX)
     933         104 :                         io->io_max = IO_MAX;
     934             :         } else {
     935           2 :                 io->io_max = io_cache;
     936             :         }
     937             : #else
     938             :         (void)io_cache;
     939             : 
     940             :         /* without pthread force the mono thread mode */
     941             :         io->io_max = 1;
     942             : #endif
     943             : 
     944         106 :         assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
     945             : 
     946         106 :         io->buffer_max = buffer_max;
     947         106 :         allocated_size = 0;
     948       13420 :         for (i = 0; i < io->io_max; ++i) {
     949       13314 :                 if (state->file_mode != ADVISE_DIRECT)
     950       13186 :                         io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
     951             :                 else
     952         128 :                         io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, block_size, &io->buffer_alloc_map[i]);
     953       13314 :                 if (!state->opt.skip_self)
     954           0 :                         mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
     955       13314 :                 allocated_size += block_size * buffer_max;
     956             :         }
     957             : 
     958         106 :         msg_progress("Using %u MiB of memory for %u cached blocks.\n", (unsigned)(allocated_size / MEBI), io->io_max);
     959             : 
     960         106 :         if (parity_writer) {
     961          92 :                 io->reader_max = handle_max;
     962          92 :                 io->writer_max = parity_handle_max;
     963             :         } else {
     964          14 :                 io->reader_max = handle_max + parity_handle_max;
     965          14 :                 io->writer_max = 0;
     966             :         }
     967             : 
     968         106 :         io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
     969         106 :         io->reader_list = malloc_nofail(io->reader_max + 1);
     970         106 :         io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
     971         106 :         io->writer_list = malloc_nofail(io->writer_max + 1);
     972             : 
     973         106 :         io->data_base = 0;
     974         106 :         io->data_count = handle_max;
     975         106 :         io->parity_base = handle_max;
     976         106 :         io->parity_count = parity_handle_max;
     977             : 
     978         826 :         for (i = 0; i < io->reader_max; ++i) {
     979         720 :                 struct snapraid_worker* worker = &io->reader_map[i];
     980             : 
     981         720 :                 worker->io = io;
     982             : 
     983         720 :                 if (i < handle_max) {
     984             :                         /* it's a data read */
     985         636 :                         worker->handle = &handle_map[i];
     986         636 :                         worker->parity_handle = 0;
     987         636 :                         worker->func = data_reader;
     988             : 
     989             :                         /* data read is put in lower buffer index */
     990         636 :                         worker->buffer_skew = 0;
     991             :                 } else {
     992             :                         /* it's a parity read */
     993          84 :                         worker->handle = 0;
     994          84 :                         worker->parity_handle = &parity_handle_map[i - handle_max];
     995          84 :                         worker->func = parity_reader;
     996             : 
     997             :                         /* parity read is put after data and computed parity */
     998          84 :                         worker->buffer_skew = parity_handle_max;
     999             :                 }
    1000             :         }
    1001             : 
    1002         613 :         for (i = 0; i < io->writer_max; ++i) {
    1003         507 :                 struct snapraid_worker* worker = &io->writer_map[i];
    1004             : 
    1005         507 :                 worker->io = io;
    1006             : 
    1007             :                 /* it's a parity write */
    1008         507 :                 worker->handle = 0;
    1009         507 :                 worker->parity_handle = &parity_handle_map[i];
    1010         507 :                 worker->func = parity_writer;
    1011             : 
    1012             :                 /* parity to write is put after data */
    1013         507 :                 worker->buffer_skew = handle_max;
    1014             :         }
    1015             : 
    1016             : #if HAVE_THREAD
    1017         106 :         if (io->io_max > 1) {
    1018         104 :                 io_read_next = io_read_next_thread;
    1019         104 :                 io_write_preset = io_write_preset_thread;
    1020         104 :                 io_write_next = io_write_next_thread;
    1021         104 :                 io_refresh = io_refresh_thread;
    1022         104 :                 io_flush = io_flush_thread;
    1023         104 :                 io_data_read = io_data_read_thread;
    1024         104 :                 io_parity_read = io_parity_read_thread;
    1025         104 :                 io_parity_write = io_parity_write_thread;
    1026         104 :                 io_start = io_start_thread;
    1027         104 :                 io_stop = io_stop_thread;
    1028             : 
    1029         104 :                 thread_mutex_init(&io->io_mutex);
    1030         104 :                 thread_cond_init(&io->read_done);
    1031         104 :                 thread_cond_init(&io->read_sched);
    1032         104 :                 thread_cond_init(&io->write_done);
    1033         104 :                 thread_cond_init(&io->write_sched);
    1034             :         } else
    1035             : #endif
    1036             :         {
    1037           2 :                 io_read_next = io_read_next_mono;
    1038           2 :                 io_write_preset = io_write_preset_mono;
    1039           2 :                 io_write_next = io_write_next_mono;
    1040           2 :                 io_refresh = io_refresh_mono;
    1041           2 :                 io_flush = io_flush_mono;
    1042           2 :                 io_data_read = io_data_read_mono;
    1043           2 :                 io_parity_read = io_parity_read_mono;
    1044           2 :                 io_parity_write = io_parity_write_mono;
    1045           2 :                 io_start = io_start_mono;
    1046           2 :                 io_stop = io_stop_mono;
    1047             :         }
    1048         106 : }
    1049             : 
    1050         106 : void io_done(struct snapraid_io* io)
    1051             : {
    1052             :         unsigned i;
    1053             : 
    1054       13420 :         for (i = 0; i < io->io_max; ++i) {
    1055       13314 :                 free(io->buffer_map[i]);
    1056       13314 :                 free(io->buffer_alloc_map[i]);
    1057             :         }
    1058             : 
    1059         106 :         free(io->reader_map);
    1060         106 :         free(io->reader_list);
    1061         106 :         free(io->writer_map);
    1062         106 :         free(io->writer_list);
    1063             : 
    1064             : #if HAVE_THREAD
    1065         106 :         if (io->io_max > 1) {
    1066         104 :                 thread_mutex_destroy(&io->io_mutex);
    1067         104 :                 thread_cond_destroy(&io->read_done);
    1068         104 :                 thread_cond_destroy(&io->read_sched);
    1069         104 :                 thread_cond_destroy(&io->write_done);
    1070         104 :                 thread_cond_destroy(&io->write_sched);
    1071             :         }
    1072             : #endif
    1073         106 : }
    1074             : 

Generated by: LCOV version 1.0