2011-05-02 7 views
10

파일의 각 줄과 파일의 다른 줄을 비교하는 haskell 프로그램을 구현하고 있습니다. 이 O에 (N^2) 시간을 실행iteratee IO를 사용하는 Mapreduce 구현 (실제 세계 haskell)이 "너무 많은 파일 열기"로 실패합니다.

distance :: Int -> Int -> Int 
distance a b = (a-b)*(a-b) 

sumOfDistancesOnSmallFile :: FilePath -> IO Int 
sumOfDistancesOnSmallFile path = do 
       fileContents <- readFile path 
       return $ allDistances $ map read $ lines $ fileContents 
       where 
        allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs) 
        allDistances _ = 0 

다음, 메모리에 전체 시간을 정수의 전체 목록을 유지하기 위해이 같은 단일 스레드 구현 될 수있다. 내 실제 프로그램에는 라인에 더 많은 숫자가 들어 있는데, 그 중 Int보다는 약간 더 복잡한 데이터 유형이 생성됩니다. 이로 인해 처리해야하는 데이터에 대한 메모리 오류가 발생했습니다.

따라서 위에서 언급 한 단일 스레드 솔루션에는 두 가지 개선 사항이 있습니다. 첫째, 실제 실행 시간을 빠르게하십시오. 둘째, 전체 목록을 전체 시간 메모리에 보관하지 않는 방법을 찾으십시오. 나는 이것이 전체 파일을 n 번 파싱해야한다는 것을 알고있다. 따라서 O (n^2) 비교와 O (n^2) 회선이 구문 분석됩니다. 차라리 실패한 프로그램보다 느린 성공적인 프로그램을 갖기 때문에 이것은 나에게 좋습니다. 입력 파일이 충분히 작 으면 나는 더 간단한 버전을 항상 사용할 수 있습니다.

여러 개의 CPU 코어를 사용하려면 Mapoverduce 구현을 Real World Haskell (24 장, 사용 가능 here)에서 가져 왔습니다.

나는 나는 또한에 프로그램을 원하기 때문에

tails . lines . readFile 

의 한 요소를 나타내는 각 청크와 라인만큼 덩어리를 반환하는 대신 덩어리에서 전체 파일을 분할의에 책에서 청크 기능을 수정 파일 크기를 확장 할 수 있기 때문에 처음에는 지연된 IO을 사용했습니다. 그러나 이것은 "열린 파일이 너무 많습니다"라는 질문에 대해서는 previous question (파일 핸들이 GC에서 너무 늦게 처리됨)에서 요청한 경우 실패합니다. 완전한 lazy IO 버전이 거기에 게시됩니다.

대답이 설명하는대로 엄격한 IO으로 문제를 해결할 수 있습니다. 실제로 2k 라인 파일의 "너무 많은 열린 파일"문제는 해결되지만 50k 파일의 "메모리 부족"은 실패합니다.

첫 번째 단일 스레드 구현 (mapreduce 제외)은 50k 파일을 처리 할 수 ​​있습니다.

나에게 가장 호소력이있는 대체 솔루션은 iteratee IO입니다. 나는 이것이 파일 핸들과 메모리 자원 고갈을 해결할 것으로 예상했다. 그러나 2k 라인 파일에서 "너무 많은 파일 열기"오류로 인해 구현이 실패합니다.

iteratee IO 버전은이 책에서와 같은 맵리 듀스 기능을 가지고 있지만, 그것이 열거 작업을 수 있도록 수정 chunkedFileEnum 있습니다.

내 질문은; 다음 iteratee IO 기반 구현의 문제점은 무엇입니까? 게으름은 어디에 있습니까?

import Control.Monad.IO.Class (liftIO) 
import Control.Monad.Trans (MonadIO, liftIO) 
import System.IO 

import qualified Data.Enumerator.List as EL 
import qualified Data.Enumerator.Text as ET 
import Data.Enumerator hiding (map, filter, head, sequence) 

import Data.Text(Text) 
import Data.Text.Read 
import Data.Maybe 

import qualified Data.ByteString.Char8 as Str 
import Control.Exception (bracket,finally) 
import Control.Monad(forM,liftM) 
import Control.Parallel.Strategies 
import Control.Parallel 
import Control.DeepSeq (NFData) 
import Data.Int (Int64) 

--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances 

--My operation for one value pair 
distance :: Int -> Int -> Int 
distance a b = (a-b)*(a-b) 

combineDistances :: [Int] -> Int 
combineDistances = sum 

--Test file generation 
createTestFile :: Int -> FilePath -> IO() 
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1 
     where infiniteList :: Int->Int-> [Int] 
       infiniteList i j = (i + j) : infiniteList j (i+j) 

--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000) 
--But i want to use multiple cores.. 
sumOfDistancesOnSmallFile :: FilePath -> IO Int 
sumOfDistancesOnSmallFile path = do 
        fileContents <- readFile path 
        return $ allDistances $ map read $ lines $ fileContents 
        where 
         allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs) 
         allDistances _ = 0 

--Setting up an enumerator of read values from a text stream 
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b 
readerEnumerator reader = joinI . (EL.concatMapM transformer) 
          where transformer input = case reader input of 
             Right (val, remainder) -> return [val] 
             Left err -> return [0] 

readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b 
readEnumerator = readerEnumerator (signed decimal) 

--The iteratee version of my operation 
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int 
distancesFirstToTailIt = do 
    maybeNum <- EL.head 
    maybe (return 0) distancesOneToManyIt maybeNum 

distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int 
distancesOneToManyIt base = do 
    maybeNum <- EL.head 
    maybe (return 0) combineNextDistance maybeNum 
    where combineNextDistance nextNum = do 
       rest <- distancesOneToManyIt base 
       return $ combineDistances [(distance base nextNum),rest] 

--The mapreduce algorithm 
mapReduce :: Strategy b -- evaluation strategy for mapping 
      -> (a -> b) -- map function 
      -> Strategy c -- evaluation strategy for reduction 
      -> ([b] -> c) -- reduce function 
      -> [a]  -- list to map over 
      -> c 
mapReduce mapStrat mapFunc reduceStrat reduceFunc input = 
      mapResult `pseq` reduceResult 
      where mapResult = parMap mapStrat mapFunc input 
       reduceResult = reduceFunc mapResult `using` reduceStrat 

--Applying the iteratee operation using mapreduce 
sumOfDistancesOnFileWithIt :: FilePath -> IO Int 
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path 

distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int 
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc) 
             rpar (sumValuesAsReduceFunc) 
          where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int 
            runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt)) 
            sumValuesAsReduceFunc :: [IO Int] -> IO Int 
            sumValuesAsReduceFunc = liftM sum . sequence 


--Working with (file)chunk enumerators: 
data ChunkSpec = CS{ 
    chunkOffset :: !Int 
    , chunkLength :: !Int 
    } deriving (Eq,Show) 

chunkedFileEnum :: (NFData (a)) => MonadIO m => 
       (FilePath-> IO [ChunkSpec]) 
      -> ([Enumerator Text m b]->IO a) 
      -> FilePath 
      -> IO a 
chunkedFileEnum chunkCreator funcOnChunks path = do 
    (chunks, handles)<- chunkedEnum chunkCreator path 
    r <- funcOnChunks chunks 
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles 

chunkedEnum :: MonadIO m=> 
       (FilePath -> IO [ChunkSpec]) 
      -> FilePath 
      -> IO ([Enumerator Text m b], [Handle]) 
chunkedEnum chunkCreator path = do 
    chunks <- chunkCreator path 
    liftM unzip . forM chunks $ \spec -> do 
     h <- openFile path ReadMode 
     hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) 
     let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF 
     return (chunk,h) 

-- returns set of chunks representing tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec] 
chunkByLinesTails path = do 
    bracket (openFile path ReadMode) hClose $ \h-> do 
     totalSize <- fromIntegral `liftM` hFileSize h 
     let chunkSize = 1 
      findChunks offset = do 
      let newOffset = offset + chunkSize 
      hSeek h AbsoluteSeek (fromIntegral newOffset) 
      let findNewline lineSeekOffset = do 
       eof <- hIsEOF h 
       if eof 
        then return [CS offset (totalSize - offset)] 
        else do 
         bytes <- Str.hGet h 256 
         case Str.elemIndex '\n' bytes of 
          Just n -> do 
           nextChunks <- findChunks (lineSeekOffset + n + 1) 
           return (CS offset (totalSize-offset):nextChunks) 
          Nothing -> findNewline (lineSeekOffset + Str.length bytes) 
      findNewline newOffset 
     findChunks 0 

, BTW 나는 맥 OS X 10.6.7 (눈 표범) 다음과 같은 패키지와
에 HaskellPlatform 2011.2.0를 실행 해요 :
은 0.9.1.10
병렬 3.1.0을 bytestring.1
열거 자 0.4.8, 설명서 포함 here

+4

병렬 처리가 너무 많습니다. chunkksize가 1이고 2k 라인 파일 인 경우 파일을 2k 번 열었습니다. 병렬 처리가 많을수록 더 많은 메모리를 사용할 수 있습니다. 필자는 실제로 이와 같은 문제가 자신과 구조를 교차해야한다고 생각하지는 않습니다. 선택한 병렬 처리 전략에 적합하다고 생각합니다. 합리적으로 큰 청크 크기를 설정하고 각 청크에서 계산 한 다음 청크에서 계산해야합니다. – sclv

+0

이것에 대해 더 자세히 설명하기 위해 공간 및 디스크 읽기에서 선형 일 가능성이있는 작업을 수행하고이를 공간 및 디스크 읽기에서 n^2 인 작업으로 변환합니다. 읽기의 게으름과 엄격함은 모든 파일 검색 결과를 저장하기 위해 파일을 저장하지 못하거나 메모리가 부족한 사이에 발생합니다. 어쨌든 이것은 잘못된 접근입니다. – sclv

+0

_pseq_는 모든 2k 스레드를 즉시 생성하지 않을 정도로 똑똑하지 않아야합니까? 나는 그들을 더 일자리라고 생각한다. 그것들은 언젠가 모두 끝내야하고 _pseq_를 사용하여 haskell에게 병렬로 실행함으로써 실행 시간을 최적화 할 수 있다고 말하려고합니다. – gerben

답변

3

오류 메시지에 열려있는 파일이 너무 많습니다. 하스켈이 프로그램의 대부분을 순차적으로 실행하기를 기대했지만 일부 프로그램은 평행하게 실행됩니다. 그러나 sclv에서 언급했듯이 Haskell은 항상 평가를합니다.

이것은 보통 순수한 기능 프로그램에서는 문제가되지 않지만 IO (자원)를 다룰 때는 문제가되지 않습니다. Real World Haskell 책에서 설명한대로 병렬 처리를 확장했습니다. 따라서 필자의 결론은 스파크 내 IO 리소스를 다룰 때 제한된 범위에서만 병렬 처리를 수행하는 것입니다. 순수한 기능 부분에서는 과도한 병렬 처리가 성공할 수 있습니다.

내 게시물에 대한 대답은 전체 프로그램에서 MapReduce를 사용하지 않고 순수한 내부 기능 부분 내에서 이루어집니다.

프로그램이 실제로 실패한 위치를 표시하려면 --enable-executable-profiling -p로 구성하고 빌드 한 다음 + RTS -p -hc -L30을 사용하여 실행했습니다. 실행 파일이 즉시 실패하기 때문에 메모리 할당 프로필이 없습니다.

                       individual inherited 
COST CENTRE    MODULE            no. entries %time %alloc %time %alloc 

MAIN      MAIN             1   0 0.0 0.3 100.0 100.0 
    main     Main            1648   2 0.0 0.0 50.0 98.9 
    sumOfDistancesOnFileWithIt MapReduceTest         1649   1 0.0 0.0 50.0 98.9 
     chunkedFileEnum  MapReduceTest          1650   1 0.0 0.0 50.0 98.9 
     chunkedEnum   MapReduceTest         1651   495 0.0 24.2 50.0 98.9 
      lineOffsets   MapReduceTest         1652   1 50.0 74.6 50.0 74.6 

chunkedEnum는 IO ([열거 텍스트 m B]와 [핸들])을 리턴하고, 명백하게 495 개 항목을 수신하고 다음 .prof 파일 생성 시간 할당 프로파일은 다음과 같이 시작한다. 입력 파일은 2k 라인 파일 이었으므로 lineOffsets의 단일 항목은 2000 개의 오프셋 목록을 반환했습니다. distanceUsingMapReduceIt에는 단일 항목이 없으므로 실제 작업이 시작되지 않았습니다!