пятница, 28 ноября 2014 г.

Перезапуск падучего приложения из самого приложения (спасение утопающих ...)

Представьте себе ситуацию, когда у вас есть некоторое серверное приложение, которое потенциально может падать (я не буду рассуждать о причинах: допустим, это новое, написанное вами приложение, которое вы хотите считать абсолютно надежным, но это не так). Предположим, что вам ни в коем случае нельзя допускать простоя службы, реализуемой этим приложением. Есть разные способы следить за работой программ и перезапускать их в случае надобности извне (например, cron). Но … Спасение утопающих — дело рук самих утопающих. Именно этот подход я и хочу продемонстрировать в самых общих чертах. Разумеется, вы должны обладать исходным кодом программы. В этом случае достаточно выделить исходный полезный код в отдельный дочерний процесс (worker), а исходный процесс превратить в надсмотрщика (master или supervisor), который будет перезапускать полезный дочерний процесс в случае его падения. Вот код, а пояснения ниже.
#include <unistd.h>
#include <sys/wait.h>
#include <string.h>
#include <iostream>

#ifdef MAXCYCLES
#define LOOPSTOPCOND i < MAXCYCLES
#else
#define LOOPSTOPCOND
#endif


int  main( int  argc, char **  argv )
{
    int  pid( 0 );

    std::cout << "Master: " << getpid() << std::endl;

    for ( int  i( 0 ); LOOPSTOPCOND; ++i )
    {
        if ( ( pid = fork() ) == 0 )    /* Worker process */
        {
            std::cout << "(cycle " << i << ") Worker: " << getpid() <<
                    std::endl;

            /* do segv if option -s was specified in command line */
            if ( argc > 1 && ! strcmp( argv[ 1 ], "-s" ) )
                int  a( *( ( int * )0 ) );

            break;
        }
        else                            /* Master process */
        {
            if ( pid < 0 )
            {
                std::cout << "Failed to fork a worker process, exiting" <<
                        std::endl;
                break;
            }

            int   status( 0 );
            bool  respawn( false );

            waitpid( pid, &status, 0 );

            if ( ! WIFSIGNALED( status ) )
                break;

            int  sig( WTERMSIG( status ) );

            std::cout << "Worker process was signaled " << sig <<  std::endl;

            switch ( sig )
            {
            case SIGSEGV:
                respawn = true;
            default:
                break;
            }

            if ( ! respawn )
                break;
        }
    }

    return 0;
}
Прежде всего нужно отметить, что код написан на C++, хотя ничто не мешает переписать его на C, если понадобится. Макросы MAXCYCLES и LOOPSTOPCOND нужны здесь только в демонстрационных целях: дабы ограничить число перезапусков воркеров величиной, заданной во время компиляции (MAXCYCLES). В релизной версии приложения цикл for скорее всего примет вид for ( ; ; ), так что эти макросы окажутся не нужны. Итак, цикл for нужен для перезапуска воркер-процессов в случае их падения. Внутри цикла for с помощью вызова fork() производится расщепление основного процесса, связанного с приложением, на две части — новый процесс (воркер) и старый (мастер). Как известно, fork() возвращает 0 в новом процессе, и какое-либо значение большее нуля — в старом. В новом процессе (блок после if ( ( pid = fork() ) == 0 )) мы выводим сообщение о старте воркера, вызываем его полезный код (в этом примере полезный код присутствует только в случае, когда в программу была передана опция командной строки -s: он приводит к посылке ядром сигнала SIGSEGV из-за разыменования нулевого указателя), и в конце обязательно ставим break;, поскольку воркер по-прежнему находится в цикле, и без его прерывания продолжит запускать новые воркеры, притворившись мастером! Внутри кода, относящегося к мастер-процессу (блок после else), прежде всего проверяется, что вызов функции fork() завершился успешно. Затем мастер ожидает завершения воркера с помощью вызова waitpid() и проверяет, каким образом тот завершился. Макрос WIFSIGNALED() позволяет установить, был ли воркер остановлен сигналом ядра или вышел самостоятельно. Во втором случае цикл for прерывается и мастер завершает свою работу. В случае, если воркер был остановлен сигналом, мы хотим узнать каким именно. Для этого предназначен макрос WTERMSIG(). Если это был сигнал SIGSEGV, то локальной переменной respawn присваивается истинное значение. Если ее значение остается ложным (когда воркер был прерван любым другим сигналом), то цикл завершается. Давайте посмотрим, как это работает (исходный файл я назвал main.cpp). Компилируем.
g++ -g -DMAXCYCLES=4 -o test main.cpp
Я ограничил число перезапусков четырьмя. Запускаем программу с нормальным завершением воркера.
./test 
Master: 6061
(cycle 0) Worker: 6062
Завершился воркер — завершился мастер. А теперь запустим программу с падучим воркером.
./test -s
Master: 6085
(cycle 0) Worker: 6086
Worker process was signaled 11
(cycle 1) Worker: 6152
Worker process was signaled 11
(cycle 2) Worker: 6154
Worker process was signaled 11
(cycle 3) Worker: 6156
Worker process was signaled 11
Мастер-процесс четыре раза перезапустил упавшие воркер-процессы, как и ожидалось. Я не стал касаться вопросов наследования ресурсов мастера при инициализации воркера и корректного завершения воркера при получении мастером сигнала прерывания — это отдельные интересные темы.

понедельник, 10 ноября 2014 г.

Ручное распараллеливание вычислений в haskell

Возьмем самую быструю реализацию задачи из предыдущей статьи.
import qualified Data.List as L
import qualified Data.List.Ordered as LO
import Data.Ord (comparing)
import Control.Arrow

alphabet :: [Char]
alphabet = "ACGT"

subSeqs :: Int -> String -> [String]
subSeqs n = takeWhile ((==n) . length) . map (take n) . L.tails

nearSeqs :: Int -> String -> [String]
nearSeqs n = LO.nubSort . nearSeqs' n
    where nearSeqs' 0 s = [s]
          nearSeqs' n s =
            concatMap (\(a, b) -> map (a++) b) $
                concatMap (\m -> [(z ++ [x], nearSeqs' (n - 1) $ tail z') |
                                  x <- alphabet, let (z, z') = splitAt m s])
                [0 .. length s - 1]

grpSeqs :: Int -> [String] -> [[String]]
grpSeqs n = L.sortBy (flip $ comparing length) . L.group . L.sort . seqs
    where seqs = concatMap (\(s, m) -> concat . replicate m . nearSeqs n $ s) .
                    map (head &&& length) . L.group . L.sort

main = do
    let seqs = ["ACGTTGCATGTCGCATGATGCATGAGAGCT",
                "CACAGTAGGCGCCGGCACACACAGCCCCGGGCCCCGGGCCGCCCCGGGCCGGC\
                \GGCCGCCGGCGCCGGCACACCGGCACAGCCGTACCGGCACAGTAGTACCGGCC\
                \GGCCGGCACACCGGCACACCGGGTACACACCGGGGCGCACACACAGGCGGGCG\
                \CCGGGCCCCGGGCCGTACCGGGCCGCCGGCGGCCCACAGGCGCCGGCACAGTA\
                \CCGGCACACACAGTAGCCCACACACAGGCGGGCGGTAGCCGGCGCACACACAC\
                \ACAGTAGGCGCACAGCCGCCCACACACACCGGCCGGCCGGCACAGGCGGGCGG\
                \GCGCACACACACCGGCACAGTAGTAGGCGGCCGGCGCACAGCC"]
    mapM_ print' $ take 10 $ grpSeqs 2 $ subSeqs 10 $ seqs !! 1
    where print' = print . (head &&& length)
Я все же решил использовать Data.List.Ordered.nubSort для удаления дубликатов в nearSeqs, поэтому программа должна работать еще чуточку быстрее. В среднем новая версия работает порядка 0.51 секунды, то есть действительно быстрее на ∼0.03 секунды. Отлично. Давайте отпрофилируем нашу программу и посмотрим, что можно сделать, чтобы она работала еще быстрее. Компилируем с поддержкой профилирования.
ghc --make -rtsopts -prof -auto-all -caf-all -fforce-recomp gen
Запускаем программу с записью данных профилирования в файл.
./gen +RTS -p
Смотрим результаты профилирования в файле gen.prof.
    Mon Nov 10 16:38 2014 Time and Allocation Profiling Report  (Final)

       gen +RTS -p -RTS

    total time  =        0.77 secs   (766 ticks @ 1000 us, 1 processor)
    total alloc = 593,276,296 bytes  (excludes profiling overheads)

COST CENTRE                MODULE  %time %alloc

grpSeqs                    Main     52.0   24.6
nearSeqs                   Main     20.6   10.7
nearSeqs.nearSeqs'.\       Main      8.4   19.9
nearSeqs.nearSeqs'.\       Main      7.8   19.5
nearSeqs.nearSeqs'         Main      4.6    9.9
nearSeqs.nearSeqs'.\.(...) Main      4.3   12.3
grpSeqs.seqs               Main      0.9    1.5
grpSeqs.seqs.\             Main      0.4    1.5


                                                                              individual     inherited
COST CENTRE                         MODULE                  no.     entries  %time %alloc   %time %alloc

MAIN                                MAIN                     51           0    0.0    0.0   100.0  100.0
 CAF:main                           :Main                   101           0    0.0    0.0     0.0    0.0
  main                              Main                    116           0    0.0    0.0     0.0    0.0
   main.print'                      Main                    118           0    0.0    0.0     0.0    0.0
 CAF:main                           Main                    100           0    0.0    0.0   100.0  100.0
  main                              Main                    102           1    0.0    0.0   100.0  100.0
   main.print'                      Main                    117           1    0.0    0.0     0.0    0.0
   main.seqs                        Main                    106           1    0.0    0.0     0.0    0.0
   subSeqs                          Main                    105           1    0.0    0.0     0.0    0.0
   grpSeqs                          Main                    103           1   52.0   24.6   100.0   99.9
    grpSeqs.seqs                    Main                    104           1    0.9    1.5    48.0   75.3
     grpSeqs.seqs.\                 Main                    107         254    0.4    1.5    47.1   73.8
      nearSeqs                      Main                    108         254   20.6   10.7    46.7   72.3
       nearSeqs.nearSeqs'           Main                    109      193294    4.6    9.9    26.1   61.7
        nearSeqs.nearSeqs'.\        Main                    112      193040    8.4   19.9     8.4   19.9
        nearSeqs.nearSeqs'.\        Main                    110       48260    7.8   19.5    13.2   31.8
         nearSeqs.nearSeqs'.\.z     Main                    115      192024    0.3    0.0     0.3    0.0
         nearSeqs.nearSeqs'.\.(...) Main                    114      193040    4.3   12.3     4.3   12.3
         nearSeqs.nearSeqs'.\.z'    Main                    113      183951    0.8    0.0     0.8    0.0
 CAF:$dShow_r1F9                    Main                     99           0    0.0    0.0     0.0    0.0
 CAF:$dOrd_r1F8                     Main                     98           0    0.0    0.0     0.0    0.0
 CAF:$dEq_r1D9                      Main                     97           0    0.0    0.0     0.0    0.0
 CAF:alphabet_ryE                   Main                     96           0    0.0    0.0     0.0    0.0
  alphabet                          Main                    111           1    0.0    0.0     0.0    0.0
 CAF                                GHC.Conc.Signal          92           0    0.0    0.0     0.0    0.0
 CAF                                GHC.IO.Handle.FD         84           0    0.0    0.0     0.0    0.0
 CAF                                GHC.IO.Encoding          79           0    0.0    0.0     0.0    0.0
 CAF                                GHC.IO.Encoding.Iconv    68           0    0.0    0.0     0.0    0.0
Видим, что абсолютным чемпионом по времязатратам является функция grpSeqs с результатом 52% от всего времени выполнения программы, за ней идет функция nearSeqs20.6%. Функция grpSeqs состоит сплошь из сортировок и группировок списков — эти типы вычислений не имеют значительных перспектив для распараллеливания. С другой стороны, списки, возвращаемые функцией nearSeqs для 254 уникальных подпоследовательностей исходной последовательности seqs !! 1, могут быть вычислены независимо, а это может означать потенциально ожидаемый 15% выигрыш по времени для 4-ядерной машины (20.6% - (20.6% / 4) ≈ 15%). Перепишем функцию grpSeqs с параллельным вычислением nearSeqs. В список импортируемых модулей добавляем
import Control.Parallel.Stategies
Определяем новую функцию grpSeqsPar.
grpSeqsPar :: Int -> [String] -> [[String]]
grpSeqsPar n = L.sortBy (flip $ comparing length) . L.group . L.sort . seqs
    where seqs = concat . parMap rdeepseq
                    (\(s, m) -> concat . replicate m . nearSeqs n $ s) .
                        map (head &&& length) . L.group . L.sort
Не забываем заменить вызов grpSeqs на grpSeqsPar в функции main.
    mapM_ print' $ take 10 $ grpSeqsPar 2 $ subSeqs 10 $ seqs !! 1
Компилируем с поддержкой распараллеливания.
ghc --make -O2 -rtsopts -threaded -feager-blackholing -fforce-recomp gen
На моей машине установлен 4-ядерный процессор Intel Core i5. Запускаем с распараллеливанием на 4 ядра.
time ./gen +RTS -N4
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)

real    0m0.521s
user    0m1.049s
sys     0m0.339s
Что-то никакого толку. Давайте запустим программу с опцией -s, которая выводит разную полезную информацию на экран.
time ./gen +RTS -N4 -s
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)
     316,334,968 bytes allocated in the heap
     161,177,512 bytes copied during GC
      18,120,968 bytes maximum residency (10 sample(s))
         310,528 bytes maximum slop
              53 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0       365 colls,   365 par    0.54s    0.14s     0.0004s    0.0147s
  Gen  1        10 colls,     9 par    0.30s    0.08s     0.0084s    0.0159s

  Parallel GC work balance: 38.30% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 254 (254 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.60s  (  0.30s elapsed)
  GC      time    0.84s  (  0.23s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    1.44s  (  0.53s elapsed)

  Alloc rate    529,187,979 bytes per MUT second

  Productivity  41.9% of total user, 113.3% of total elapsed

gc_alloc_block_sync: 67687
whitehole_spin: 0
gen[0].sync: 22
gen[1].sync: 10747

real    0m0.536s
user    0m1.025s
sys     0m0.417s
Продуктивность программы составила 41.9%, остальное время ушло на сборку мусора (GC time 0.84s)! Обычно с этим борются увеличением предполагаемого размера кучи (suggested heap size) для сборщика мусора (см. документацию здесь). Давайте установим эту величину в 1 гигабайт.
time ./gen +RTS -N4 -H1G -s
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)
     315,385,264 bytes allocated in the heap
      17,826,760 bytes copied during GC
      15,081,784 bytes maximum residency (2 sample(s))
         118,472 bytes maximum slop
             970 MB total memory in use (0 MB lost due to fragmentation)

                                    Tot time (elapsed)  Avg pause  Max pause
  Gen  0         1 colls,     1 par    0.03s    0.01s     0.0110s    0.0110s
  Gen  1         2 colls,     1 par    0.12s    0.04s     0.0178s    0.0321s

  Parallel GC work balance: 83.58% (serial 0%, perfect 100%)

  TASKS: 10 (1 bound, 9 peak workers (9 total), using -N4)

  SPARKS: 254 (254 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time    0.56s  (  0.31s elapsed)
  GC      time    0.15s  (  0.05s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time    0.72s  (  0.37s elapsed)

  Alloc rate    567,298,189 bytes per MUT second

  Productivity  78.5% of total user, 153.2% of total elapsed

gc_alloc_block_sync: 15012
whitehole_spin: 0
gen[0].sync: 2831
gen[1].sync: 0

real    0m0.391s
user    0m0.600s
sys     0m0.137s
А вот это уже здорово! Время выполнения программы сократилось до 0.39 секунды, продуктивность увеличилась до 78.5%. Кстати, в выводе программы есть строка, озаглавленная SPARKS: в ней находится статистика запусков параллельных вычислений, которые называются искрами (sparks). Всего было запущено 254 искры, что соответствует числу уникальных 10-меров в исходной последовательности. А что будет, если в оригинальной, нераспараллеленной версии выделить 1 гигабайт для кучи? Может быть она тоже ускорится? Как показала практика — нет. Продуктивность программы действительно возрастает, но время выполнения практически не изменяется, как это ни странно. И, наконец, следующая версия функции grpSeqsPar работает еще чуточку быстрее (но незначительно).
grpSeqsPar :: Int -> Int -> [String] -> [[String]]
grpSeqsPar m n = L.sortBy (flip $ comparing length) . L.group . sortPar . seqs
    where sortPar = foldr LO.merge [] . parMap rdeepseq L.sort
          seqs = concat' m . parMap rdeepseq
                    (\(s, m) -> concat . replicate m . nearSeqs n $ s) .
                        map (head &&& length) . L.group . L.sort
          concat' m s = let l = length s
                        in map concat $ chunksOf
                            (l `div` m + if l `mod` m == 0 then 0 else 1) s
Вот только я совсем не уверен в ее оптимальности (именно в оптимальности, а не в корректности): в самом деле, вложенные стратегии rdeepseq скорее всего не самый лучший вариант. В этой версии затратная сортировка списка, возвращаемого функцией seqs, заменена на серию последовательных слияний LO.merge его по отдельности отсортированных частей. Количество частей, на которые разбивается исходный список, задается новым параметром m функции grpSeqsPar. На мой взгляд, он должен быть равен количеству ядер процессора. Соответственно, вызов в функции main принимает вид
    mapM_ print' $ take 10 $ grpSeqsPar 4 2 $ subSeqs 10 $ seqs !! 1
Для использования функции chunksOf в список импортируемых модулей следует добавить
import Data.List.Split (chunksOf)
Если скомпилировать новую программу и запустить ее на выполнение, то в строке SPARKS появятся несколько десятков погасших искр (fizzled sparks). Искры гаснут в том случае, когда танки (thunks), то есть элементы для вычисления, на которые они ссылаются, уже были вычислены в некотором параллельном потоке программы. Такая ситуация может свидетельствовать о неоптимальном выборе стратегии параллелизации. Опция компиляции -feager-blackholing позволяет предотвратить многократные вычисления одних и тех же танков, поэтому само по себе появление погасших искр не должно влиять на производительность программы. Что еще можно распараллелить в этой программе? Ну, например, функцию subSeqs. Хотя в данном случае это не даст никаких преимуществ, поскольку время выполнения этой функции по результатам профилирования составляет 0.0%. Тем не менее, для полноты картины я приведу параллельную версию subSeqs.
subSeqsPar :: Int -> Int -> String -> [String]
subSeqsPar m n s = subSeqs n s `using` parListChunk (length s `div` m) rdeepseq
Здесь я решил применить parListChunk со стратегией rdeepseq. Размер чанка вычисляется на основании значения первого параметра m функции subSeqsPar путем разделения исходной последовательности s на m равных частей: это упрощенный аналог алгоритма из функции concat’. Update. Вложенная стратегия parMap rdeepseq для вычисления всех мутаций с помощью nearSeqs в функции grpSeqsPar действительно не нужна: внешняя стратегия из функции sortPar должна прекрасно справиться с распараллеливанием сама. По этой же самой причине subSeqsPar тоже не нужна в принципе. Таким образом, функция grpSeqsPar переписывается так:
grpSeqsPar :: Int -> Int -> [String] -> [[String]]
grpSeqsPar m n = L.sortBy (flip $ comparing length) . L.group . sortPar . seqs
    where sortPar = foldr LO.merge [] . parMap rdeepseq L.sort
          seqs = concat' m .
                    map (\(s, m) -> concat . replicate m . nearSeqs n $ s) .
                        map (head &&& length) . L.group . L.sort
          concat' m s = let l = length s
                        in map concat $ chunksOf
                            (l `div` m + if l `mod` m == 0 then 0 else 1) s
Новая стратегия должна сократить число искр до четырех и потенциально ускорить выполнение программы. Кстати, для визуализации выполнения параллельной программы на haskell существует прекраснейший инструмент — программа threadscope. Для того, чтобы воспользоваться threadscope, нужно скомпилировать нашу программу с поддержкой журнала событий.
ghc --make -O2 -rtsopts -threaded -eventlog -feager-blackholing -fforce-recomp gen
Давайте запустим программу с опцией -H1G, как мы это делали раньше.
time ./gen +RTS -N4 -H1G -qa -lf
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)

real    0m0.282s
user    0m0.525s
sys     0m0.118s
Не обращайте внимание на новый рекорд скорости (0.282 сек.): просто я запустил программу на более быстрой машине. Зато обратите внимание на новые опции: -qa и -lf. Первая опция экспериментальная и в документации haskell говорится, что она может ускорить выполнение параллельной программы, а может и не ускорить. Для меня она действительно работает. Вторая опция приводит к записи событий в файл gen.eventlog. Давайте откроем этот файл в программе threadscope. Вот такая картинка появится на экране. Что ж, здесь достаточно много информации как на графике в центральном окне, так и во вкладках, расположенных в нижнем окне. Верхняя часть графика — Activity — отражает степень загрузки потоков выполнения (у нас их четыре) в течении всего времени жизни программы. Четыре плотных зеленых слоя, начинающиеся на 25 мс означают, что все четыре потока работают на полную катушку: это очень хорошо. Начиная с 0.13 с программа стала работать в одном потоке: это финальная сортировка по длинам в функции grpSeqsPar (вы можете убедиться в этом, удалив L.sortBy (flip $ comparing length) . из определения grpSeqsPar и запустив программу заново). А что же это за провал, длившийся 20 мс в самом начале программы, когда активность была нулевой и все потоки занимались сборкой мусора (оранжевые полосы)? К счастью, график является интерактивным, и мы можем узнать, что произошло в определенной точке выполнения программы, кликнув мышкой по графику в этой точке и просмотрев события во вкладке Raw events. Ага, переполнение кучи. Опция -H1G не влияет на первый запуск сборщика мусора: для этого нужно настроить опцию -A. Давайте запустим программу с этой опцией.
time ./gen +RTS -N4 -A128M -qa -lf
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)

real    0m0.270s
user    0m0.471s
sys     0m0.115s
Я подобрал размер аллокации (128M) таким образом, чтобы единожды выделенной кучи хватило на все время выполнения программы. Теперь threadscope покажет такую картинку. На этот раз продуктивность программы составила 99% против 93% в предыдущем запуске, сборщик мусора практически не задействован. Однако время выполнения почти не изменилось. Помните, я говорил, что это странно. Так вот, никакой странности здесь оказывается нет: просто время сборки мусора эффективно заменилось временем, затраченным на аллокацию и управление гораздо бо́льшими объемами памяти. В данном случае значительное время было потрачено в фазе инициализации (INIT): это хорошо видно на графике. А что будет, если использовать параметры аллокации кучи для сборщика мусора по умолчанию?
time ./gen +RTS -N4 -qa -lf
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)

real    0m0.312s
user    0m0.877s
sys     0m0.169s
Интересный результат. Время выполнения немного выросло. Время сборщика мусора (GC time) теперь равно времени полезной нагрузки (Mutator time), и это видно по соотношению зеленого и оранжевого цветов в потоках. Загрузка просела до двух (чуть позже трех) потоков одновременно. Несмотря на это, время выполнения, как я уже сказал, выросло ненамного: видимо это связано с более эффективной стратегией аллокации и меньшим потреблением памяти. Update 2. Функцию concat’ из grpSeqsPar можно переписать с помощью стандартной функции ceiling.
          concat' m s = let l = let (/.) = (/) `on` fromIntegral
                                in ceiling $ length s /. m
                        in map concat $ chunksOf l s
Для функции on нужен
import Data.Function (on)
Определение функции (/.) я взял отсюда. Она нужна для преобразования типов (или коэрции) аргументов операции вещественного деления (/) из Int в абстрактный тип Num a => a. Без этого преобразования типов программа не скомпилируется: неявные преобразования типов в стиле C и C++ в haskell не поддерживаются.

среда, 5 ноября 2014 г.

Решение задачи о поиске наиболее часто встречающейся подпоследовательности с заданным максимальным числом мутаций на haskell

Эта задача имеет большое значение в биологии и часто формулируется как поиск наиболее часто встречающегося k-мера с максимально допустимым числом мутаций n внутри заданной генетической последовательности. Генетическая последовательность обычно моделируется строкой произвольной длины, составленной из символов некоторого алфавита. Соответственно, k-мер — это подстрока длиной k внутри генетической последовательности. В постановке задачи искомые k-меры могут накладываться друг на друга. Так, если общая длина исходной последовательности равна 10, то в ней определены один 10-мер, два 9-мера, три 8-мера и т.д. Благодаря условию о допустимых мутациях искомая наиболее часто повторяющаяся подпоследовательность может вообще ни разу не встретиться в исходной последовательности! Для решения задачи предложим такой алгоритм: для каждого k-мера в заданной последовательности находим все его мутации, включая исходное значение, складываем их вместе и находим наиболее часто встречающийся элемент: он и будет ответом на поставленную задачу. Я приведу решение, а ниже его прокомментирую.
import qualified Data.List as L
import Data.Ord (comparing)
import Control.Arrow

alphabet :: [Char]
alphabet = "ACGT"

subSeqs :: Int -> String -> [String]
subSeqs n = takeWhile ((==n) . length) . map (take n) . L.tails

nearSeqs :: Int -> String -> [String]
nearSeqs n = map head . L.group . L.sort . nearSeqs' n
    where nearSeqs' 0 s = [s]
          nearSeqs' n s =
            concatMap (\(a, b) -> map (a++) b) $
                concatMap (\m -> [(z ++ [x], nearSeqs' (n - 1) $ tail z') |
                                  x <- alphabet, let (z, z') = splitAt m s])
                [0 .. length s - 1]

grpSeqs :: Int -> [String] -> [[String]]
grpSeqs n = L.sortBy (flip $ comparing length) . L.group . L.sort . seqs
    where seqs = concatMap $ nearSeqs n

main = do
    let seqs = ["ACGTTGCATGTCGCATGATGCATGAGAGCT",
                "CACAGTAGGCGCCGGCACACACAGCCCCGGGCCCCGGGCCGCCCCGGGCCGGC\
                \GGCCGCCGGCGCCGGCACACCGGCACAGCCGTACCGGCACAGTAGTACCGGCC\
                \GGCCGGCACACCGGCACACCGGGTACACACCGGGGCGCACACACAGGCGGGCG\
                \CCGGGCCCCGGGCCGTACCGGGCCGCCGGCGGCCCACAGGCGCCGGCACAGTA\
                \CCGGCACACACAGTAGCCCACACACAGGCGGGCGGTAGCCGGCGCACACACAC\
                \ACAGTAGGCGCACAGCCGCCCACACACACCGGCCGGCCGGCACAGGCGGGCGG\
                \GCGCACACACACCGGCACAGTAGTAGGCGGCCGGCGCACAGCC"]
    {-print $ nearSeqs 2 "TTTT"-}
    {-mapM_ print' $ take 10 $ grpSeqs 1 $ subSeqs 4 $ head seqs-}
    mapM_ print' $ take 10 $ grpSeqs 2 $ subSeqs 10 $ seqs !! 1
    where print' = print . (head &&& length)
В функции main происходит тестирование алгоритма. Функция alphabet задает алфавит последовательности. Как видим, он состоит всего из четырех символов. Я намеренно задал тип alphabet как [Char], а не String, дабы подчеркнуть, что это алфавит, а не простая строка. Для простоты я не стал каким-либо образом верифицировать исходные последовательности в seqs, объявленной в main. Функция subSeqs возвращает все k-меры из заданной последовательности (ее первый формальный параметр n соответствует числу k). Функция nearSeqs возвращает список уникальных мутаций (включая исходное значение) некоторой последовательности, ее формальный параметр n задает максимально допустимое количество мутаций. Реализация этой функции основана на рекурсивном применении генератора списков по всевозможным перестановкам общего числа n позиций внутри заданной последовательности. Для каждой перестановки генерируются всевозможные мутации на основе алфавита alphabet: при таком подходе результирующий список может содержать повторяющиеся элементы, поэтому он фильтруется функцией map head . L.group . L.sort. Как выяснилось впоследствии, правильный выбор этой функции очень сильно влияет на скорость программы. Так, в моем оригинальном варианте L.nub . L.sort программа работала в два раза медленнее! Просто L.nub работает еще медленнее (все-таки это функция класса O(n^2)). Текущий вариант я подсмотрел здесь. Позже я нашел самый быстрый вариант: Data.List.Ordered.nubSort, но все же решил на нем не останавливаться, поскольку он требует импорта еще одного модуля. К слову, функцию nearSeqs можно определить без необходимости финальной сортировки списка, заранее вынимая те элементы из alpahabet, которые приведут к дубликатам.
nearSeqs'' :: Int -> String -> [String]
nearSeqs'' n s = s : concatMap (`nearSeqs'` s) [1 .. n]
    where nearSeqs' 0 s = [s]
          nearSeqs' n s =
            concatMap (\(a, b) -> map (a++) b) $
                concatMap (\m -> [(z ++ [x], nearSeqs' (n - 1) $ tail z') |
                                  let (z, z') = splitAt m s,
                                  x <- L.delete (head z') alphabet])
                [0 .. length s - 1]
Однако, эта функция работает медленнее, поэтому я ее забраковал. Чтобы посмотреть на функцию nearSeqs в деле, раскомментируйте первую закомментированную строку в main, скомпилируйте программу и запустите ее. Функция grpSeqs вызывает nearSeqs, сортирует и группирует возвращенный ею список, и далее сортирует сгруппированный список по длине его подсписков в порядке убывания. В функции main из этого списка берется 10 первых подсписков и выводятся их первый элемент (все элементы подсписков должны совпадать) и длина. Для компиляции всех примеров в этой статье я использовал флаг -O2, имя исходника — gen.hs. Вывод программы с замером времени выполнения:
time ./gen
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)

real    0m0.639s
user    0m0.612s
sys     0m0.022s
Чуть больше, чем полсекунды. Неплохо. Однако, этого я достиг не сразу и поэтому перепробовал множество подходов. В частности, я пробовал использовать для сборки результатов nearSeqs хэш-таблицу Data.HashTable.IO. В список импортируемых модулей добавляем
import qualified Data.HashTable.IO as H
import Data.Maybe (fromMaybe)
В определения записываем
type HashTable k v = H.CuckooHashTable k v

grpSeqsHt :: Int -> Int -> [String] -> IO (HashTable String Int)
grpSeqsHt m n s = do
    h <- H.newSized m
    let insert' h k = H.lookup h k >>= H.insert h k . (+1) . fromMaybe 0
    mapM_ (mapM_ (insert' h) . nearSeqs n) s
    return h

mostOftenSeqsHt :: HashTable String Int -> IO [(String, Int)]
mostOftenSeqsHt = H.foldM findMax []
    where findMax [] a = return [a]
          findMax c@((_, v') : _) a@(_, v) =
            return $ case v `compare` v' of GT -> [a]; EQ -> a : c; LT -> c
В функцию main записываем строку
    grpSeqsHt 70000 2 (subSeqs 10 $ seqs !! 1) >>= mostOftenSeqsHt >>= print
Функция grpSeqsHt создает хэш-таблицу исходного размера m (величина 70000 подобрана эмпирическим путем: примерно столько уникальных k-меров генерируется из seqs !! 1). Ключами таблицы являются значения k-меров, значениями — их общее количество в исходных данных. Если при добавлении новой ячейки ее еще не существует в таблице (это проверяется функцией H.lookup), то она создается со значением 1, иначе соответствующее ей значение увеличивается на единицу. Функция mostOftenSeqsHt проходится по уже заполненной таблице и возвращает список наиболее часто встречающихся k-меров (заметьте отличие: первый вариант решения задачи формировал список всех k-меров в порядке уменьшения их частоты в исходной последовательности). Второй вариант с использованием хэш-таблицы учитывает тот факт, что вычисление списка наиболее часто встречающихся k-меров можно произвести прямо во время заполнения таблицы внутри функции grpSeqsHt, и функция mostOftenSeqsHt становится ненужной. Итак, добавляем в список импортируемых модулей
import Control.Monad (foldM)
, в определения
grpSeqsHt' :: Int -> Int -> [String] ->
    IO (HashTable String Int, [(String, Int)])
grpSeqsHt' m n s = do
    h <- H.newSized m
    let insert' h k =
            H.lookup h k >>=
                (\v -> H.insert h k v >> return (k, v)) . (+1) . fromMaybe 0
        findMax [] a = return [a]
        findMax c@((_, v') : _) a@(_, v) =
            return $ case v `compare` v' of GT -> [a]; EQ -> a : c; LT -> c
    c <- foldM (\c s -> foldM
                    (\c k -> insert' h k >>= findMax c) c $ nearSeqs n s) [] s
    return (h, c)
, и в функцию main
    grpSeqsHt' 70000 2 (subSeqs 10 $ seqs !! 1) >>= print . snd
В этом случае использование хэш-таблицы становится побочным артефактом реализации, поскольку за пределами функции grpSeqsHt’ она не используется, несмотря на то, что grpSeqsHt’ возвращает ее как первый элемент кортежа. Как ни странно, варианты с хэш-таблицей оказались медленнее. Так, первый вариант выдал
time ./gen 
[("GCACACAGAC",20),("GCGCACACAC",20)]

real    0m0.656s
user    0m0.643s
sys     0m0.011s
, а второй
time ./gen 
[("GCGCACACAC",20),("GCACACAGAC",20)]

real    0m0.676s
user    0m0.664s
sys     0m0.009s
Ну и, собственно, чемпион! Возьмем исходное решение и чуточку изменим алгоритм. Предположим, что список, возвращенный из функции subSeqs, может содержать дубликаты (как оказалось, seqs !! 1 действительно содержит 98 дубликатов k-меров при их общем числе 352). Для дубликатов не нужно получать список мутаций каждый раз заново: достаточно получить список единожды и реплицировать его по числу дубликатов. Давайте просто перепишем функцию grpSeqs в предположении, что она по-прежнему будет получать данные из subSeqs.
grpSeqs :: Int -> [String] -> [[String]]
grpSeqs n = L.sortBy (flip $ comparing length) . L.group . L.sort . seqs
    where seqs = concatMap (\(s, m) -> concat . replicate m . nearSeqs n $ s) .
                    map (head &&& length) . L.group . L.sort
Вывод программы:
time ./gen 
("GCACACAGAC",20)
("GCGCACACAC",20)
("ACACACACAC",19)
("CCCGCACACA",19)
("CACACACACG",18)
("CGCGCACACA",18)
("CGGCACACAC",18)
("GCACACACCC",18)
("GCACACACGC",18)
("GCCCGCACAC",18)

real    0m0.546s
user    0m0.525s
sys     0m0.018s