LCOV - code coverage report
Current view: top level - cmdline - io.c (source / functions) Hit Total Coverage
Test: lcov.info Lines: 407 417 97.6 %
Date: 2017-11-06 22:14:04 Functions: 29 31 93.5 %

          Line data    Source code
       1             : /*
       2             :  * Copyright (C) 2016 Andrea Mazzoleni
       3             :  *
       4             :  * This program is free software: you can redistribute it and/or modify
       5             :  * it under the terms of the GNU General Public License as published by
       6             :  * the Free Software Foundation, either version 3 of the License, or
       7             :  * (at your option) any later version.
       8             :  *
       9             :  * This program is distributed in the hope that it will be useful,
      10             :  * but WITHOUT ANY WARRANTY; without even the implied warranty of
      11             :  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      12             :  * GNU General Public License for more details.
      13             :  *
      14             :  * You should have received a copy of the GNU General Public License
      15             :  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
      16             :  */
      17             : 
      18             : #include "portable.h"
      19             : 
      20             : #include "io.h"
      21             : 
      22             : /**
      23             :  * Get the next block position to process.
      24             :  */
      25      162051 : static block_off_t io_position_next(struct snapraid_io* io)
      26             : {
      27             :         block_off_t blockcur;
      28             : 
      29             :         /* get the next position */
      30      647484 :         while (io->block_next < io->block_max && !io->block_is_enabled(io->block_arg, io->block_next))
      31      323382 :                 ++io->block_next;
      32             : 
      33      162051 :         blockcur = io->block_next;
      34             : 
      35             :         /* next block for the next call */
      36      162051 :         ++io->block_next;
      37             : 
      38      162051 :         return blockcur;
      39             : }
      40             : 
      41             : /**
      42             :  * Setup the next pending task for all readers.
      43             :  */
      44      162051 : static void io_reader_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      45             : {
      46             :         unsigned i;
      47             : 
      48     1355145 :         for (i = 0; i < io->reader_max; ++i) {
      49     1193094 :                 struct snapraid_worker* worker = &io->reader_map[i];
      50     1193094 :                 struct snapraid_task* task = &worker->task_map[task_index];
      51             : 
      52             :                 /* setup the new pending task */
      53     1193094 :                 if (blockcur < io->block_max)
      54     1110144 :                         task->state = TASK_STATE_READY;
      55             :                 else
      56       82950 :                         task->state = TASK_STATE_EMPTY;
      57             : 
      58     1193094 :                 task->path[0] = 0;
      59     1193094 :                 if (worker->handle)
      60      972306 :                         task->disk = worker->handle->disk;
      61             :                 else
      62      220788 :                         task->disk = 0;
      63     1193094 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
      64     1193094 :                 task->position = blockcur;
      65     1193094 :                 task->block = 0;
      66     1193094 :                 task->file = 0;
      67     1193094 :                 task->file_pos = 0;
      68     1193094 :                 task->read_size = 0;
      69     1193094 :                 task->is_timestamp_different = 0;
      70             :         }
      71      162051 : }
      72             : 
      73             : /**
      74             :  * Setup the next pending task for all writers.
      75             :  */
      76      112360 : static void io_writer_sched(struct snapraid_io* io, int task_index, block_off_t blockcur)
      77             : {
      78             :         unsigned i;
      79             : 
      80      656415 :         for (i = 0; i < io->writer_max; ++i) {
      81      544055 :                 struct snapraid_worker* worker = &io->writer_map[i];
      82      544055 :                 struct snapraid_task* task = &worker->task_map[task_index];
      83             : 
      84             :                 /* setup the new pending task */
      85      544055 :                 task->state = TASK_STATE_READY;
      86      544055 :                 task->path[0] = 0;
      87      544055 :                 task->disk = 0;
      88      544055 :                 task->buffer = io->buffer_map[task_index][worker->buffer_skew + i];
      89      544055 :                 task->position = blockcur;
      90      544055 :                 task->block = 0;
      91      544055 :                 task->file = 0;
      92      544055 :                 task->file_pos = 0;
      93      544055 :                 task->read_size = 0;
      94      544055 :                 task->is_timestamp_different = 0;
      95             :         }
      96      112360 : }
      97             : 
      98             : /**
      99             :  * Setup an empty next pending task for all writers.
     100             :  */
     101        2396 : static void io_writer_sched_empty(struct snapraid_io* io, int task_index, block_off_t blockcur)
     102             : {
     103             :         unsigned i;
     104             : 
     105       16772 :         for (i = 0; i < io->writer_max; ++i) {
     106       14376 :                 struct snapraid_worker* worker = &io->writer_map[i];
     107       14376 :                 struct snapraid_task* task = &worker->task_map[task_index];
     108             : 
     109             :                 /* setup the new pending task */
     110       14376 :                 task->state = TASK_STATE_EMPTY;
     111       14376 :                 task->path[0] = 0;
     112       14376 :                 task->disk = 0;
     113       14376 :                 task->buffer = 0;
     114       14376 :                 task->position = blockcur;
     115       14376 :                 task->block = 0;
     116       14376 :                 task->file = 0;
     117       14376 :                 task->file_pos = 0;
     118       14376 :                 task->read_size = 0;
     119       14376 :                 task->is_timestamp_different = 0;
     120             :         }
     121        2396 : }
     122             : 
     123             : /*****************************************************************************/
     124             : /* mono thread */
     125             : 
     126        4688 : static block_off_t io_read_next_mono(struct snapraid_io* io, void*** buffer)
     127             : {
     128             :         block_off_t blockcur_schedule;
     129             : 
     130             :         /* reset the index */
     131        4688 :         io->reader_index = 0;
     132             : 
     133        4688 :         blockcur_schedule = io_position_next(io);
     134             : 
     135             :         /* schedule the next read */
     136        4688 :         io_reader_sched(io, 0, blockcur_schedule);
     137             : 
     138             :         /* set the buffer to use */
     139        4688 :         *buffer = io->buffer_map[0];
     140             : 
     141        4688 :         return blockcur_schedule;
     142             : }
     143             : 
     144        4687 : static void io_write_preset_mono(struct snapraid_io* io, block_off_t blockcur, int skip)
     145             : {
     146             :         unsigned i;
     147             : 
     148             :         /* reset the index */
     149        4687 :         io->writer_index = 0;
     150             : 
     151             :         /* clear errors */
     152       23435 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     153       18748 :                 io->writer_error[i] = 0;
     154             : 
     155        4687 :         if (skip) {
     156             :                 /* skip the next write */
     157           0 :                 io_writer_sched_empty(io, 0, blockcur);
     158             :         } else {
     159             :                 /* schedule the next write */
     160        4687 :                 io_writer_sched(io, 0, blockcur);
     161             :         }
     162        4687 : }
     163             : 
     164        4687 : static void io_write_next_mono(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     165             : {
     166             :         unsigned i;
     167             : 
     168             :         (void)blockcur;
     169             :         (void)skip;
     170             : 
     171             :         /* report errors */
     172       23435 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     173       18748 :                 writer_error[i] = io->writer_error[i];
     174        4687 : }
     175             : 
     176           0 : static void io_refresh_mono(struct snapraid_io* io)
     177             : {
     178             :         (void)io;
     179           0 : }
     180             : 
     181       28122 : static struct snapraid_task* io_task_read_mono(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     182             : {
     183             :         struct snapraid_worker* worker;
     184             :         struct snapraid_task* task;
     185             :         unsigned i;
     186             : 
     187             :         /* get the next task */
     188       28122 :         i = io->reader_index++;
     189             : 
     190       28122 :         assert(base <= i && i < base + count);
     191             : 
     192       28122 :         worker = &io->reader_map[i];
     193       28122 :         task = &worker->task_map[0];
     194             : 
     195             :         /* do the work */
     196       28122 :         if (task->state != TASK_STATE_EMPTY)
     197       28122 :                 worker->func(worker, task);
     198             : 
     199             :         /* return the position */
     200       28122 :         *pos = i - base;
     201             : 
     202             :         /* store the waiting index */
     203       28122 :         waiting_map[0] = i - base;
     204       28122 :         *waiting_mac = 1;
     205             : 
     206       28122 :         return task;
     207             : }
     208             : 
     209       28122 : static struct snapraid_task* io_data_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     210             : {
     211       28122 :         return io_task_read_mono(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     212             : }
     213             : 
     214           0 : static struct snapraid_task* io_parity_read_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     215             : {
     216           0 :         return io_task_read_mono(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     217             : }
     218             : 
     219        4687 : static void io_parity_write_mono(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     220             : {
     221             :         struct snapraid_worker* worker;
     222             :         struct snapraid_task* task;
     223             :         unsigned i;
     224             : 
     225             :         /* get the next task */
     226        4687 :         i = io->writer_index++;
     227             : 
     228        4687 :         worker = &io->writer_map[i];
     229        4687 :         task = &worker->task_map[0];
     230             : 
     231        4687 :         io->writer_error[i] = 0;
     232             : 
     233             :         /* do the work */
     234        4687 :         if (task->state != TASK_STATE_EMPTY)
     235        4687 :                 worker->func(worker, task);
     236             : 
     237             :         /* return the position */
     238        4687 :         *pos = i;
     239             : 
     240             :         /* store the waiting index */
     241        4687 :         waiting_map[0] = i;
     242        4687 :         *waiting_mac = 1;
     243        4687 : }
     244             : 
     245           1 : static void io_start_mono(struct snapraid_io* io,
     246             :         block_off_t blockstart, block_off_t blockmax,
     247             :         int (*block_is_enabled)(void* arg, block_off_t), void* blockarg)
     248             : {
     249           1 :         io->block_start = blockstart;
     250           1 :         io->block_max = blockmax;
     251           1 :         io->block_is_enabled = block_is_enabled;
     252           1 :         io->block_arg = blockarg;
     253           1 :         io->block_next = blockstart;
     254           1 : }
     255             : 
     256           1 : static void io_stop_mono(struct snapraid_io* io)
     257             : {
     258             :         (void)io;
     259           1 : }
     260             : 
     261             : /*****************************************************************************/
     262             : /* multi thread */
     263             : 
     264             : /* disable multithread if pthread is not present */
     265             : #if HAVE_PTHREAD
     266             : 
     267             : /**
     268             :  * Get the next task to work on for a reader.
     269             :  *
     270             :  * This is the synchronization point for workers with the io.
     271             :  */
     272     1155859 : static struct snapraid_task* io_reader_step(struct snapraid_worker* worker)
     273             : {
     274     1155859 :         struct snapraid_io* io = worker->io;
     275             : 
     276             :         /* the synchronization is protected by the io mutex */
     277     1155859 :         thread_mutex_lock(&io->io_mutex);
     278             : 
     279             :         while (1) {
     280             :                 unsigned next_index;
     281             : 
     282             :                 /* check if the worker has to exit */
     283             :                 /* even if there is work to do */
     284     2044739 :                 if (io->done) {
     285         648 :                         thread_mutex_unlock(&io->io_mutex);
     286         648 :                         return 0;
     287             :                 }
     288             : 
     289             :                 /* get the next pending task */
     290     2044091 :                 next_index = (worker->index + 1) % io->io_max;
     291             : 
     292             :                 /* if the queue of pending tasks is not empty */
     293     2044091 :                 if (next_index != io->reader_index) {
     294             :                         struct snapraid_task* task;
     295             : 
     296             :                         /* the index that the IO may be waiting for */
     297     1163889 :                         unsigned waiting_index = io->reader_index;
     298             : 
     299             :                         /* the index that worker just completed */
     300     1163889 :                         unsigned done_index = worker->index;
     301             : 
     302             :                         /* get the new working task */
     303     1163889 :                         worker->index = next_index;
     304     1163889 :                         task = &worker->task_map[worker->index];
     305             : 
     306             :                         /* if the just completed task is at this index */
     307     1163889 :                         if (done_index == waiting_index) {
     308             :                                 /* notify the IO that a new read is complete */
     309        2053 :                                 thread_cond_signal_and_unlock(&io->read_done, &io->io_mutex);
     310             :                         } else {
     311     1161836 :                                 thread_mutex_unlock(&io->io_mutex);
     312             :                         }
     313             : 
     314             :                         /* return the new task */
     315     1163286 :                         return task;
     316             :                 }
     317             : 
     318             :                 /* otherwise wait for a read_sched event */
     319      880202 :                 thread_cond_wait(&io->read_sched, &io->io_mutex);
     320      880202 :         }
     321             : }
     322             : 
     323             : /**
     324             :  * Get the next task to work on for a writer.
     325             :  *
     326             :  * This is the synchronization point for workers with the io.
     327             :  */
     328      550861 : static struct snapraid_task* io_writer_step(struct snapraid_worker* worker, int state)
     329             : {
     330      550861 :         struct snapraid_io* io = worker->io;
     331             :         int error_index;
     332             : 
     333             :         /* the synchronization is protected by the io mutex */
     334      550861 :         thread_mutex_lock(&io->io_mutex);
     335             : 
     336             :         /* counts the number of errors in the global state */
     337      554206 :         error_index = state - IO_WRITER_ERROR_BASE;
     338      554206 :         if (error_index >= 0 && error_index < IO_WRITER_ERROR_MAX)
     339           0 :                 ++io->writer_error[error_index];
     340             : 
     341             :         while (1) {
     342             :                 unsigned next_index;
     343             : 
     344             :                 /* get the next pending task */
     345      980523 :                 next_index = (worker->index + 1) % io->io_max;
     346             : 
     347             :                 /* if the queue of pending tasks is not empty */
     348      980523 :                 if (next_index != io->writer_index) {
     349             :                         struct snapraid_task* task;
     350             : 
     351             :                         /* the index that the IO may be waiting for */
     352      553744 :                         unsigned waiting_index = (io->writer_index + 1) % io->io_max;
     353             : 
     354             :                         /* the index that worker just completed */
     355      553744 :                         unsigned done_index = worker->index;
     356             : 
     357             :                         /* get the new working task */
     358      553744 :                         worker->index = next_index;
     359      553744 :                         task = &worker->task_map[worker->index];
     360             : 
     361             :                         /* if the just completed task is at this index */
     362      553744 :                         if (done_index == waiting_index) {
     363             :                                 /* notify the IO that a new write is complete */
     364          16 :                                 thread_cond_signal_and_unlock(&io->write_done, &io->io_mutex);
     365             :                         } else {
     366      553728 :                                 thread_mutex_unlock(&io->io_mutex);
     367             :                         }
     368             : 
     369             :                         /* return the new task */
     370      552907 :                         return task;
     371             :                 }
     372             : 
     373             :                 /* check if the worker has to exit */
     374             :                 /* but only if there is no work to do */
     375      426779 :                 if (io->done) {
     376         462 :                         thread_mutex_unlock(&io->io_mutex);
     377         462 :                         return 0;
     378             :                 }
     379             : 
     380             :                 /* otherwise wait for a write_sched event */
     381      426317 :                 thread_cond_wait(&io->write_sched, &io->io_mutex);
     382      426317 :         }
     383             : }
     384             : 
     385             : /**
     386             :  * Get the next block position to operate on.
     387             :  *
     388             :  * This is the synchronization point for workers with the io.
     389             :  */
     390      145298 : static block_off_t io_read_next_thread(struct snapraid_io* io, void*** buffer)
     391             : {
     392             :         block_off_t blockcur_schedule;
     393             :         block_off_t blockcur_caller;
     394             :         unsigned i;
     395             : 
     396             :         /* get the next parity position to process */
     397      145298 :         blockcur_schedule = io_position_next(io);
     398             : 
     399             :         /* ensure that all data/parity was read */
     400      145298 :         assert(io->reader_list[0] == io->reader_max);
     401             : 
     402             :         /* setup the list of workers to process */
     403     1373266 :         for (i = 0; i <= io->reader_max; ++i)
     404     1227968 :                 io->reader_list[i] = i;
     405             : 
     406             :         /* the synchronization is protected by the io mutex */
     407      145298 :         thread_mutex_lock(&io->io_mutex);
     408             : 
     409             :         /* schedule the next read */
     410      145298 :         io_reader_sched(io, io->reader_index, blockcur_schedule);
     411             : 
     412             :         /* set the index for the tasks to return to the caller */
     413      145298 :         io->reader_index = (io->reader_index + 1) % io->io_max;
     414             : 
     415             :         /* get the position to operate at high level from one task */
     416      145298 :         blockcur_caller = io->reader_map[0].task_map[io->reader_index].position;
     417             : 
     418             :         /* set the buffer to use */
     419      145298 :         *buffer = io->buffer_map[io->reader_index];
     420             : 
     421             :         /* signal all the workers that there is a new pending task */
     422      145298 :         thread_cond_broadcast_and_unlock(&io->read_sched, &io->io_mutex);
     423             : 
     424      145298 :         return blockcur_caller;
     425             : }
     426             : 
     427      110069 : static void io_write_preset_thread(struct snapraid_io* io, block_off_t blockcur, int skip)
     428             : {
     429             :         (void)io;
     430             :         (void)blockcur;
     431             :         (void)skip;
     432      110069 : }
     433             : 
     434      110069 : static void io_write_next_thread(struct snapraid_io* io, block_off_t blockcur, int skip, int* writer_error)
     435             : {
     436             :         unsigned i;
     437             : 
     438             :         /* ensure that all parity was written */
     439      110069 :         assert(io->writer_list[0] == io->writer_max);
     440             : 
     441             :         /* setup the list of workers to process */
     442      773882 :         for (i = 0; i <= io->writer_max; ++i)
     443      663813 :                 io->writer_list[i] = i;
     444             : 
     445             :         /* the synchronization is protected by the io mutex */
     446      110069 :         thread_mutex_lock(&io->io_mutex);
     447             : 
     448             :         /* report errors */
     449      550345 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i) {
     450      440276 :                 writer_error[i] = io->writer_error[i];
     451      440276 :                 io->writer_error[i] = 0;
     452             :         }
     453             : 
     454      110069 :         if (skip) {
     455             :                 /* skip the next write */
     456        2396 :                 io_writer_sched_empty(io, io->writer_index, blockcur);
     457             :         } else {
     458             :                 /* schedule the next write */
     459      107673 :                 io_writer_sched(io, io->writer_index, blockcur);
     460             :         }
     461             : 
     462             :         /* at this point the writers must be in sync with the readers */
     463      110069 :         assert(io->writer_index == io->reader_index);
     464             : 
     465             :         /* set the index to be used for the next write */
     466      110069 :         io->writer_index = (io->writer_index + 1) % io->io_max;
     467             : 
     468             :         /* signal all the workers that there is a new pending task */
     469      110069 :         thread_cond_broadcast_and_unlock(&io->write_sched, &io->io_mutex);
     470      110069 : }
     471             : 
     472           1 : static void io_refresh_thread(struct snapraid_io* io)
     473             : {
     474             :         unsigned i;
     475             : 
     476             :         /* the synchronization is protected by the io mutex */
     477           1 :         thread_mutex_lock(&io->io_mutex);
     478             : 
     479             :         /* for all readers, count the number of read blocks */
     480           7 :         for (i = 0; i < io->reader_max; ++i) {
     481             :                 unsigned begin, end, cached;
     482           6 :                 struct snapraid_worker* worker = &io->reader_map[i];
     483             : 
     484             :                 /* the first block read */
     485           6 :                 begin = io->reader_index + 1;
     486             :                 /* the block in reading */
     487           6 :                 end = worker->index;
     488           6 :                 if (begin > end)
     489           6 :                         end += io->io_max;
     490           6 :                 cached = end - begin;
     491             : 
     492           6 :                 if (worker->parity_handle)
     493           0 :                         io->state->parity[worker->parity_handle->level].cached = cached;
     494             :                 else
     495           6 :                         worker->handle->disk->cached = cached;
     496             :         }
     497             : 
     498             :         /* for all writers, count the number of written blocks */
     499             :         /* note that this is a kind of "opposite" of cached blocks */
     500           2 :         for (i = 0; i < io->writer_max; ++i) {
     501             :                 unsigned begin, end, cached;
     502           1 :                 struct snapraid_worker* worker = &io->writer_map[i];
     503             : 
     504             :                 /* the first block written */
     505           1 :                 begin = io->writer_index + 1;
     506             :                 /* the block in writing */
     507           1 :                 end = worker->index;
     508           1 :                 if (begin > end)
     509           1 :                         end += io->io_max;
     510           1 :                 cached = end - begin;
     511             : 
     512           1 :                 io->state->parity[worker->parity_handle->level].cached = cached;
     513             :         }
     514             : 
     515           1 :         thread_mutex_unlock(&io->io_mutex);
     516           1 : }
     517             : 
     518     1082022 : static struct snapraid_task* io_task_read_thread(struct snapraid_io* io, unsigned base, unsigned count, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     519             : {
     520             :         unsigned waiting_cycle;
     521             : 
     522             :         /* count the waiting cycle */
     523     1082022 :         waiting_cycle = 0;
     524             : 
     525             :         /* clear the waiting indexes */
     526     1082022 :         *waiting_mac = 0;
     527             : 
     528             :         /* the synchronization is protected by the io mutex */
     529     1082022 :         thread_mutex_lock(&io->io_mutex);
     530             : 
     531             :         while (1) {
     532             :                 unsigned char* let;
     533             :                 unsigned busy_index;
     534             : 
     535             :                 /* get the index the IO is using */
     536             :                 /* we must ensure that this index has not a read in progress */
     537             :                 /* to avoid a concurrent access */
     538     1083866 :                 busy_index = io->reader_index;
     539             : 
     540             :                 /* search for a worker that has already finished */
     541     1083866 :                 let = &io->reader_list[0];
     542             :                 while (1) {
     543     1091735 :                         unsigned i = *let;
     544             : 
     545             :                         /* if we are at the end */
     546     1091735 :                         if (i == io->reader_max)
     547        1844 :                                 break;
     548             : 
     549             :                         /* if it's in range */
     550     1089891 :                         if (base <= i && i < base + count) {
     551             :                                 struct snapraid_worker* worker;
     552             : 
     553             :                                 /* if it's the first cycle */
     554     1088511 :                                 if (waiting_cycle == 0) {
     555             :                                         /* store the waiting indexes */
     556     1086614 :                                         waiting_map[(*waiting_mac)++] = i - base;
     557             :                                 }
     558             : 
     559     1088511 :                                 worker = &io->reader_map[i];
     560             : 
     561             :                                 /* if the worker has finished this index */
     562     1088511 :                                 if (busy_index != worker->index) {
     563             :                                         struct snapraid_task* task;
     564             : 
     565     1082022 :                                         task = &worker->task_map[io->reader_index];
     566             : 
     567     1082022 :                                         thread_mutex_unlock(&io->io_mutex);
     568             : 
     569             :                                         /* mark the worker as processed */
     570             :                                         /* setting the previous one to point at the next one */
     571     1082022 :                                         *let = io->reader_list[i + 1];
     572             : 
     573             :                                         /* return the position */
     574     1082022 :                                         *pos = i - base;
     575             : 
     576             :                                         /* on the first cycle, no one is waiting */
     577     1082022 :                                         if (waiting_cycle == 0)
     578     1080193 :                                                 *waiting_mac = 0;
     579             : 
     580     2164044 :                                         return task;
     581             :                                 }
     582             :                         }
     583             : 
     584             :                         /* next position to check */
     585        7869 :                         let = &io->reader_list[i + 1];
     586        7869 :                 }
     587             : 
     588             :                 /* if no worker is ready, wait for an event */
     589        1844 :                 thread_cond_wait(&io->read_done, &io->io_mutex);
     590             : 
     591             :                 /* count the cycles */
     592        1844 :                 ++waiting_cycle;
     593        1844 :         }
     594             : }
     595             : 
     596      871218 : static struct snapraid_task* io_data_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     597             : {
     598      871218 :         return io_task_read_thread(io, io->data_base, io->data_count, pos, waiting_map, waiting_mac);
     599             : }
     600             : 
     601      210804 : static struct snapraid_task* io_parity_read_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     602             : {
     603      210804 :         return io_task_read_thread(io, io->parity_base, io->parity_count, pos, waiting_map, waiting_mac);
     604             : }
     605             : 
     606      553744 : static void io_parity_write_thread(struct snapraid_io* io, unsigned* pos, unsigned* waiting_map, unsigned* waiting_mac)
     607             : {
     608             :         unsigned waiting_cycle;
     609             : 
     610             :         /* count the waiting cycle */
     611      553744 :         waiting_cycle = 0;
     612             : 
     613             :         /* clear the waiting indexes */
     614      553744 :         *waiting_mac = 0;
     615             : 
     616             :         /* the synchronization is protected by the io mutex */
     617      553744 :         thread_mutex_lock(&io->io_mutex);
     618             : 
     619             :         while (1) {
     620             :                 unsigned char* let;
     621             :                 unsigned busy_index;
     622             : 
     623             :                 /* get the next index the IO is going to use */
     624             :                 /* we must ensure that this index has not a write in progress */
     625             :                 /* to avoid a concurrent access */
     626             :                 /* note that we are already sure that a write is not in progress */
     627             :                 /* at the index the IO is using at now */
     628      553754 :                 busy_index = (io->writer_index + 1) % io->io_max;
     629             : 
     630             :                 /* search for a worker that has already finished */
     631      553754 :                 let = &io->writer_list[0];
     632             :                 while (1) {
     633      553785 :                         unsigned i = *let;
     634             :                         struct snapraid_worker* worker;
     635             : 
     636             :                         /* if we are at the end */
     637      553785 :                         if (i == io->writer_max)
     638          10 :                                 break;
     639             : 
     640             :                         /* if it's the first cycle */
     641      553775 :                         if (waiting_cycle == 0) {
     642             :                                 /* store the waiting indexes */
     643      553764 :                                 waiting_map[(*waiting_mac)++] = i;
     644             :                         }
     645             : 
     646      553775 :                         worker = &io->writer_map[i];
     647             : 
     648             :                         /* the two indexes cannot be equal */
     649      553775 :                         assert(io->writer_index != worker->index);
     650             : 
     651             :                         /* if the worker has finished this index */
     652      553775 :                         if (busy_index != worker->index) {
     653      553744 :                                 thread_mutex_unlock(&io->io_mutex);
     654             : 
     655             :                                 /* mark the worker as processed */
     656             :                                 /* setting the previous one to point at the next one */
     657      553744 :                                 *let = io->writer_list[i + 1];
     658             : 
     659             :                                 /* return the position */
     660      553744 :                                 *pos = i;
     661             : 
     662             :                                 /* on the first cycle, no one is waiting */
     663      553744 :                                 if (waiting_cycle == 0)
     664      553734 :                                         *waiting_mac = 0;
     665             : 
     666     1107488 :                                 return;
     667             :                         }
     668             : 
     669             :                         /* next position to check */
     670          31 :                         let = &io->writer_list[i + 1];
     671          31 :                 }
     672             : 
     673             :                 /* if no worker is ready, wait for an event */
     674          10 :                 thread_cond_wait(&io->write_done, &io->io_mutex);
     675             : 
     676             :                 /* count the cycles */
     677          10 :                 ++waiting_cycle;
     678          10 :         }
     679             : }
     680             : 
     681     1081517 : static void io_reader_worker(struct snapraid_worker* worker, struct snapraid_task* task)
     682             : {
     683             :         /* if we reached the end */
     684     1081517 :         if (task->position >= worker->io->block_max) {
     685             :                 /* complete a dummy task */
     686         118 :                 task->state = TASK_STATE_EMPTY;
     687             :         } else {
     688     1081399 :                 worker->func(worker, task);
     689             :         }
     690     1070843 : }
     691             : 
     692         639 : static void* io_reader_thread(void* arg)
     693             : {
     694         639 :         struct snapraid_worker* worker = arg;
     695             : 
     696             :         /* force completion of the first task */
     697         639 :         io_reader_worker(worker, &worker->task_map[0]);
     698             : 
     699             :         while (1) {
     700             :                 struct snapraid_task* task;
     701             : 
     702             :                 /* get the new task */
     703     1159719 :                 task = io_reader_step(worker);
     704             : 
     705             :                 /* if no task, it means to exit */
     706     1163823 :                 if (!task)
     707         645 :                         break;
     708             : 
     709             :                 /* nothing more to do */
     710     1163178 :                 if (task->state == TASK_STATE_EMPTY)
     711       82346 :                         continue;
     712             : 
     713     1080832 :                 assert(task->state == TASK_STATE_READY);
     714             : 
     715             :                 /* work on the assigned task */
     716     1080832 :                 io_reader_worker(worker, task);
     717     1159083 :         }
     718             : 
     719         645 :         return 0;
     720             : }
     721             : 
     722         462 : static void* io_writer_thread(void* arg)
     723             : {
     724         462 :         struct snapraid_worker* worker = arg;
     725         462 :         int latest_state = TASK_STATE_DONE;
     726             : 
     727             :         while (1) {
     728             :                 struct snapraid_task* task;
     729             : 
     730             :                 /* get the new task */
     731      551134 :                 task = io_writer_step(worker, latest_state);
     732             : 
     733             :                 /* if no task, it means to exit */
     734      554046 :                 if (!task)
     735         461 :                         break;
     736             : 
     737             :                 /* nothing more to do */
     738      553585 :                 if (task->state == TASK_STATE_EMPTY) {
     739       14361 :                         latest_state = TASK_STATE_DONE;
     740       14361 :                         continue;
     741             :                 }
     742             : 
     743      539224 :                 assert(task->state == TASK_STATE_READY);
     744             : 
     745             :                 /* work on the assigned task */
     746      539224 :                 worker->func(worker, task);
     747             : 
     748             :                 /* save the resulting state */
     749      536311 :                 latest_state = task->state;
     750      550672 :         }
     751             : 
     752         461 :         return 0;
     753             : }
     754             : 
     755          95 : static void io_start_thread(struct snapraid_io* io,
     756             :         block_off_t blockstart, block_off_t blockmax,
     757             :         int (*block_is_enabled)(void* arg, block_off_t), void* blockarg)
     758             : {
     759             :         unsigned i;
     760             : 
     761          95 :         io->block_start = blockstart;
     762          95 :         io->block_max = blockmax;
     763          95 :         io->block_is_enabled = block_is_enabled;
     764          95 :         io->block_arg = blockarg;
     765          95 :         io->block_next = blockstart;
     766             : 
     767          95 :         io->done = 0;
     768          95 :         io->reader_index = io->io_max - 1;
     769          95 :         io->writer_index = 0;
     770             : 
     771             :         /* clear writer errors */
     772         475 :         for (i = 0; i < IO_WRITER_ERROR_MAX; ++i)
     773         380 :                 io->writer_error[i] = 0;
     774             : 
     775             :         /* setup the initial read pending tasks, except the latest one, */
     776             :         /* the latest will be initialized at the fist io_read_next() call */
     777       12160 :         for (i = 0; i < io->io_max - 1; ++i) {
     778       12065 :                 block_off_t blockcur = io_position_next(io);
     779             : 
     780       12065 :                 io_reader_sched(io, i, blockcur);
     781             :         }
     782             : 
     783             :         /* setup the lists of workers to process */
     784          95 :         io->reader_list[0] = io->reader_max;
     785         652 :         for (i = 0; i <= io->writer_max; ++i)
     786         557 :                 io->writer_list[i] = i;
     787             : 
     788             :         /* start the reader threads */
     789         743 :         for (i = 0; i < io->reader_max; ++i) {
     790         648 :                 struct snapraid_worker* worker = &io->reader_map[i];
     791             : 
     792         648 :                 worker->index = 0;
     793             : 
     794         648 :                 thread_create(&worker->thread, 0, io_reader_thread, worker);
     795             :         }
     796             : 
     797             :         /* start the writer threads */
     798         557 :         for (i = 0; i < io->writer_max; ++i) {
     799         462 :                 struct snapraid_worker* worker = &io->writer_map[i];
     800             : 
     801         462 :                 worker->index = io->io_max - 1;
     802             : 
     803         462 :                 thread_create(&worker->thread, 0, io_writer_thread, worker);
     804             :         }
     805          95 : }
     806             : 
     807          95 : static void io_stop_thread(struct snapraid_io* io)
     808             : {
     809             :         unsigned i;
     810             : 
     811          95 :         thread_mutex_lock(&io->io_mutex);
     812             : 
     813             :         /* mark that we are stopping */
     814          95 :         io->done = 1;
     815             : 
     816             :         /* signal all the threads to recognize the new state */
     817          95 :         thread_cond_broadcast(&io->read_sched);
     818          95 :         thread_cond_broadcast(&io->write_sched);
     819             : 
     820          95 :         thread_mutex_unlock(&io->io_mutex);
     821             : 
     822             :         /* wait for all readers to terminate */
     823         743 :         for (i = 0; i < io->reader_max; ++i) {
     824         648 :                 struct snapraid_worker* worker = &io->reader_map[i];
     825             :                 void* retval;
     826             : 
     827             :                 /* wait for thread termination */
     828         648 :                 thread_join(worker->thread, &retval);
     829             :         }
     830             : 
     831             :         /* wait for all writers to terminate */
     832         557 :         for (i = 0; i < io->writer_max; ++i) {
     833         462 :                 struct snapraid_worker* worker = &io->writer_map[i];
     834             :                 void* retval;
     835             : 
     836             :                 /* wait for thread termination */
     837         462 :                 thread_join(worker->thread, &retval);
     838             :         }
     839          95 : }
     840             : 
     841             : #endif
     842             : 
     843             : /*****************************************************************************/
     844             : /* global */
     845             : 
     846          96 : void io_init(struct snapraid_io* io, struct snapraid_state* state,
     847             :         unsigned io_cache, unsigned buffer_max,
     848             :         void (*data_reader)(struct snapraid_worker*, struct snapraid_task*),
     849             :         struct snapraid_handle* handle_map, unsigned handle_max,
     850             :         void (*parity_reader)(struct snapraid_worker*, struct snapraid_task*),
     851             :         void (*parity_writer)(struct snapraid_worker*, struct snapraid_task*),
     852             :         struct snapraid_parity_handle* parity_handle_map, unsigned parity_handle_max)
     853             : {
     854             :         unsigned i;
     855             :         size_t allocated;
     856             : 
     857          96 :         io->state = state;
     858             : 
     859             : #if HAVE_PTHREAD
     860          96 :         if (io_cache == 0) {
     861             :                 /* default is 8 MiB of cache */
     862             :                 /* this seems to be a good tradeoff between speed and memory usage */
     863          95 :                 io->io_max = 8 * 1024 * 1024 / state->block_size;
     864          95 :                 if (io->io_max < IO_MIN)
     865           0 :                         io->io_max = IO_MIN;
     866          95 :                 if (io->io_max > IO_MAX)
     867          95 :                         io->io_max = IO_MAX;
     868             :         } else {
     869           1 :                 io->io_max = io_cache;
     870             :         }
     871             : #else
     872             :         (void)io_cache;
     873             : 
     874             :         /* without pthread force the mono thread mode */
     875             :         io->io_max = 1;
     876             : #endif
     877             : 
     878          96 :         assert(io->io_max == 1 || (io->io_max >= IO_MIN && io->io_max <= IO_MAX));
     879             : 
     880          96 :         io->buffer_max = buffer_max;
     881          96 :         allocated = 0;
     882       12257 :         for (i = 0; i < io->io_max; ++i) {
     883       12161 :                 if (state->file_mode != ADVISE_DIRECT)
     884       12161 :                         io->buffer_map[i] = malloc_nofail_vector_align(handle_max, buffer_max, state->block_size, &io->buffer_alloc_map[i]);
     885             :                 else
     886           0 :                         io->buffer_map[i] = malloc_nofail_vector_direct(handle_max, buffer_max, state->block_size, &io->buffer_alloc_map[i]);
     887       12161 :                 if (!state->opt.skip_self)
     888           0 :                         mtest_vector(io->buffer_max, state->block_size, io->buffer_map[i]);
     889       12161 :                 allocated += state->block_size * buffer_max;
     890             :         }
     891             : 
     892          96 :         msg_progress("Using %u MiB of memory for %u blocks of IO cache.\n", (unsigned)(allocated / MEBI), io->io_max);
     893             : 
     894          96 :         if (parity_writer) {
     895          83 :                 io->reader_max = handle_max;
     896          83 :                 io->writer_max = parity_handle_max;
     897             :         } else {
     898          13 :                 io->reader_max = handle_max + parity_handle_max;
     899          13 :                 io->writer_max = 0;
     900             :         }
     901             : 
     902          96 :         io->reader_map = malloc_nofail(sizeof(struct snapraid_worker) * io->reader_max);
     903          96 :         io->reader_list = malloc_nofail(io->reader_max + 1);
     904          96 :         io->writer_map = malloc_nofail(sizeof(struct snapraid_worker) * io->writer_max);
     905          96 :         io->writer_list = malloc_nofail(io->writer_max + 1);
     906             : 
     907          96 :         io->data_base = 0;
     908          96 :         io->data_count = handle_max;
     909          96 :         io->parity_base = handle_max;
     910          96 :         io->parity_count = parity_handle_max;
     911             : 
     912         750 :         for (i = 0; i < io->reader_max; ++i) {
     913         654 :                 struct snapraid_worker* worker = &io->reader_map[i];
     914             : 
     915         654 :                 worker->io = io;
     916             : 
     917         654 :                 if (i < handle_max) {
     918             :                         /* it's a data read */
     919         576 :                         worker->handle = &handle_map[i];
     920         576 :                         worker->parity_handle = 0;
     921         576 :                         worker->func = data_reader;
     922             : 
     923             :                         /* data read is put in lower buffer index */
     924         576 :                         worker->buffer_skew = 0;
     925             :                 } else {
     926             :                         /* it's a parity read */
     927          78 :                         worker->handle = 0;
     928          78 :                         worker->parity_handle = &parity_handle_map[i - handle_max];
     929          78 :                         worker->func = parity_reader;
     930             : 
     931             :                         /* parity read is put after data and computed parity */
     932          78 :                         worker->buffer_skew = parity_handle_max;
     933             :                 }
     934             :         }
     935             : 
     936         559 :         for (i = 0; i < io->writer_max; ++i) {
     937         463 :                 struct snapraid_worker* worker = &io->writer_map[i];
     938             : 
     939         463 :                 worker->io = io;
     940             : 
     941             :                 /* it's a parity write */
     942         463 :                 worker->handle = 0;
     943         463 :                 worker->parity_handle = &parity_handle_map[i];
     944         463 :                 worker->func = parity_writer;
     945             : 
     946             :                 /* parity to write is put after data */
     947         463 :                 worker->buffer_skew = handle_max;
     948             :         }
     949             : 
     950             : #if HAVE_PTHREAD
     951          96 :         if (io->io_max > 1) {
     952          95 :                 io_read_next = io_read_next_thread;
     953          95 :                 io_write_preset = io_write_preset_thread;
     954          95 :                 io_write_next = io_write_next_thread;
     955          95 :                 io_refresh = io_refresh_thread;
     956          95 :                 io_data_read = io_data_read_thread;
     957          95 :                 io_parity_read = io_parity_read_thread;
     958          95 :                 io_parity_write = io_parity_write_thread;
     959          95 :                 io_start = io_start_thread;
     960          95 :                 io_stop = io_stop_thread;
     961             : 
     962          95 :                 thread_mutex_init(&io->io_mutex, 0);
     963          95 :                 thread_cond_init(&io->read_done, 0);
     964          95 :                 thread_cond_init(&io->read_sched, 0);
     965          95 :                 thread_cond_init(&io->write_done, 0);
     966          95 :                 thread_cond_init(&io->write_sched, 0);
     967             :         } else
     968             : #endif
     969             :         {
     970           1 :                 io_read_next = io_read_next_mono;
     971           1 :                 io_write_preset = io_write_preset_mono;
     972           1 :                 io_write_next = io_write_next_mono;
     973           1 :                 io_refresh = io_refresh_mono;
     974           1 :                 io_data_read = io_data_read_mono;
     975           1 :                 io_parity_read = io_parity_read_mono;
     976           1 :                 io_parity_write = io_parity_write_mono;
     977           1 :                 io_start = io_start_mono;
     978           1 :                 io_stop = io_stop_mono;
     979             :         }
     980          96 : }
     981             : 
     982          96 : void io_done(struct snapraid_io* io)
     983             : {
     984             :         unsigned i;
     985             : 
     986       12257 :         for (i = 0; i < io->io_max; ++i) {
     987       12161 :                 free(io->buffer_map[i]);
     988       12161 :                 free(io->buffer_alloc_map[i]);
     989             :         }
     990             : 
     991          96 :         free(io->reader_map);
     992          96 :         free(io->reader_list);
     993          96 :         free(io->writer_map);
     994          96 :         free(io->writer_list);
     995             : 
     996             : #if HAVE_PTHREAD
     997          96 :         if (io->io_max > 1) {
     998          95 :                 thread_mutex_destroy(&io->io_mutex);
     999          95 :                 thread_cond_destroy(&io->read_done);
    1000          95 :                 thread_cond_destroy(&io->read_sched);
    1001          95 :                 thread_cond_destroy(&io->write_done);
    1002          95 :                 thread_cond_destroy(&io->write_sched);
    1003             :         }
    1004             : #endif
    1005          96 : }
    1006             : 

Generated by: LCOV version 1.13