파일의 각 줄과 파일의 다른 줄을 비교하는 haskell 프로그램을 구현하고 있습니다. 이 O에 (N^2) 시간을 실행iteratee IO를 사용하는 Mapreduce 구현 (실제 세계 haskell)이 "너무 많은 파일 열기"로 실패합니다.
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs)
allDistances _ = 0
다음, 메모리에 전체 시간을 정수의 전체 목록을 유지하기 위해이 같은 단일 스레드 구현 될 수있다. 내 실제 프로그램에는 라인에 더 많은 숫자가 들어 있는데, 그 중 Int보다는 약간 더 복잡한 데이터 유형이 생성됩니다. 이로 인해 처리해야하는 데이터에 대한 메모리 오류가 발생했습니다.
따라서 위에서 언급 한 단일 스레드 솔루션에는 두 가지 개선 사항이 있습니다. 첫째, 실제 실행 시간을 빠르게하십시오. 둘째, 전체 목록을 전체 시간 메모리에 보관하지 않는 방법을 찾으십시오. 나는 이것이 전체 파일을 n 번 파싱해야한다는 것을 알고있다. 따라서 O (n^2) 비교와 O (n^2) 회선이 구문 분석됩니다. 차라리 실패한 프로그램보다 느린 성공적인 프로그램을 갖기 때문에 이것은 나에게 좋습니다. 입력 파일이 충분히 작 으면 나는 더 간단한 버전을 항상 사용할 수 있습니다.
여러 개의 CPU 코어를 사용하려면 Mapoverduce 구현을 Real World Haskell (24 장, 사용 가능 here)에서 가져 왔습니다.
나는 나는 또한에 프로그램을 원하기 때문에tails . lines . readFile
의 한 요소를 나타내는 각 청크와 라인만큼 덩어리를 반환하는 대신 덩어리에서 전체 파일을 분할의에 책에서 청크 기능을 수정 파일 크기를 확장 할 수 있기 때문에 처음에는 지연된 IO을 사용했습니다. 그러나 이것은 "열린 파일이 너무 많습니다"라는 질문에 대해서는 previous question (파일 핸들이 GC에서 너무 늦게 처리됨)에서 요청한 경우 실패합니다. 완전한 lazy IO 버전이 거기에 게시됩니다.
대답이 설명하는대로 엄격한 IO으로 문제를 해결할 수 있습니다. 실제로 2k 라인 파일의 "너무 많은 열린 파일"문제는 해결되지만 50k 파일의 "메모리 부족"은 실패합니다.
첫 번째 단일 스레드 구현 (mapreduce 제외)은 50k 파일을 처리 할 수 있습니다.
나에게 가장 호소력이있는 대체 솔루션은 iteratee IO입니다. 나는 이것이 파일 핸들과 메모리 자원 고갈을 해결할 것으로 예상했다. 그러나 2k 라인 파일에서 "너무 많은 파일 열기"오류로 인해 구현이 실패합니다.
iteratee IO 버전은이 책에서와 같은 맵리 듀스 기능을 가지고 있지만, 그것이 열거 작업을 수 있도록 수정 chunkedFileEnum 있습니다.
내 질문은; 다음 iteratee IO 기반 구현의 문제점은 무엇입니까? 게으름은 어디에 있습니까?
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + (sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
chunkOffset :: !Int
, chunkLength :: !Int
} deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
, BTW 나는 맥 OS X 10.6.7 (눈 표범) 다음과 같은 패키지와
에 HaskellPlatform 2011.2.0를 실행 해요 :
은 0.9.1.10
병렬 3.1.0을 bytestring.1
열거 자 0.4.8, 설명서 포함 here
병렬 처리가 너무 많습니다. chunkksize가 1이고 2k 라인 파일 인 경우 파일을 2k 번 열었습니다. 병렬 처리가 많을수록 더 많은 메모리를 사용할 수 있습니다. 필자는 실제로 이와 같은 문제가 자신과 구조를 교차해야한다고 생각하지는 않습니다. 선택한 병렬 처리 전략에 적합하다고 생각합니다. 합리적으로 큰 청크 크기를 설정하고 각 청크에서 계산 한 다음 청크에서 계산해야합니다. – sclv
이것에 대해 더 자세히 설명하기 위해 공간 및 디스크 읽기에서 선형 일 가능성이있는 작업을 수행하고이를 공간 및 디스크 읽기에서 n^2 인 작업으로 변환합니다. 읽기의 게으름과 엄격함은 모든 파일 검색 결과를 저장하기 위해 파일을 저장하지 못하거나 메모리가 부족한 사이에 발생합니다. 어쨌든 이것은 잘못된 접근입니다. – sclv
_pseq_는 모든 2k 스레드를 즉시 생성하지 않을 정도로 똑똑하지 않아야합니까? 나는 그들을 더 일자리라고 생각한다. 그것들은 언젠가 모두 끝내야하고 _pseq_를 사용하여 haskell에게 병렬로 실행함으로써 실행 시간을 최적화 할 수 있다고 말하려고합니다. – gerben