2017-01-13 7 views
4

postgresql-simple은 스트리밍 쿼리 용 기능을 제공합니다.postgresql-simple을 사용하여 스트리밍 도관 소스 만들기

fold 
    :: (FromRow row, ToRow params) 
    => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a 

스트리밍을 최대한 활용하는 콘딧 소스를 만들고 싶습니다. IOfold에서 (내가 생각하세요?)를 contravariant 위치에 나타나기 때문에

mySource :: (FromRow row, Monad m) => Source m row 

불행하게도, 정말 종류와 사투를 벌인거야. 다음은 유형을 확인하지만 값을 산출하기 전에 전체 스트림을 폴드합니다.

getConduit :: Connection -> IO (C.ConduitM() Event IO()) 
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo 
    where 
    foo :: C.ConduitM() Event IO() -> Event -> IO (C.ConduitM() Event IO()) 
    foo cond evt = pure (cond >> C.yield evt) 

이것을 구현하는 방법에 대한 모든 의견을 매우 높이 평가할 것입니다. 감사! 마지막으로 만들기 위해 stm-conduit를 사용

+0

, 우리는 [PostgreSQL을-libpq를 (https://www.stackage.org/package/postgresql-libpq)이 동작을 구현하는 아래로 떨어졌다. postgresql-simple이 (TMChan 접근법 @Alec 언급과 함께)이 작업을 수행 할 수 있는지 확신하지 못합니다. –

+0

함수가있는'pipes-postgresql-simple'이 있습니다, ['query'] (https://hackage.haskell.org/package/pipes-postgresql-simple-0.1.2.0/docs/Pipes-PostgreSQL-Simple .html # v : query); [produceIO'] (https://github.com/ocharles/pipes-postgresql-simple/blob)의 구현에서 알 수 있듯이 기본적으로 @Alec이 언급 한 전략 ('pipes-concurrency'으로 구현 됨)을 사용하고 있습니다. /master/src/Pipes/PostgreSQL/Simple.hs#L117) – Michael

답변

5

이것에 대해 이동하는 한 (그래서 좋은되지 않음) 식으로

  • 그냥이 채널
  • 에 행을 덤프 foreach_를 설정 행을받을 수있는 새로운 TMChan을 채널 외부 출처

나는 이것을 시험 할 방법이 없지만 다음은 작동해야합니다.

즉, 컴파일 및 실행 위에
import Conduit 
import Database.PostgreSQL.Simple (foreach_) 
import Data.Conduit.TMChan (sourceTMChan) 
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically) 

mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row) 
mySource connection query = do 
    chan <- newTMChanIO 
    forEach_ connection query (atomically . writeTMChan chan) 
    pure (sourceTMChan chan) 

아니라 우리는이 쉬울 수 있습니다 forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m()) -> m()이 있다면 ...

0

여기에 알렉의의 수정입니다. mkPgSource은 알렉이 그의 게시물 마지막 부분에서 언급 한 일반적인 기능입니다. 지속적인-PostgreSQL의에서

import Database.PostgreSQL.Simple 
import Database.PostgreSQL.Simple.FromRow 
import Database.PostgreSQL.Simple.ToRow 
import Control.Monad.IO.Class (MonadIO) 
import Data.Conduit.TMChan (sourceTMChan) 
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, 
closeTMChan, TMChan) 
import GHC.Conc (atomically, forkIO) 
import Conduit 

--closes the channel after action is done to terminate the source 
mkPgSource :: (MonadIO m, FromRow r) => ((r -> IO()) -> IO()) -> IO (Source m r) 
mkPgSource action = do 
    chan <- newTMChanIO 
    _ <- forkIO $ do action $ atomically . (writeTMChan chan) 
       atomically $ closeTMChan chan 
    pure $ sourceTMChan chan 

sourceQuery :: (ToRow params, FromRow r, MonadIO m) => 
    Connection -> Query -> params -> IO (Source m r) 
sourceQuery conn q params = mkPgSource $ forEach conn q params 

sourceQuery_ :: (FromRow r, MonadIO m) => Connection -> Query -> IO 
(Source m r) 
sourceQuery_ conn q = mkPgSource $ forEach_ conn q