2016-10-04 9 views
2

저는 파이프 생태계와 파이프 동시성을 가진 스트리밍 기능을 작성하고 있습니다.이 기능은 작동 라이브러리를 기반으로하여 제가 산출하는 작은 프로그램 조각을 빨리 만들 수있게합니다 네트워크를 통해 서버에 명령을 보내거나 쉘 명령에서 stdin/out 명령을 실행 한 다음 응답을 다시 읽습니다. 이 경우에는 별표이지만 비슷한 것으로 일반화 될 수 있습니다.다른 값을 반환하는 소비자와 생산자와 파이프 연결

처음에는 파이프를 염두에두고 작성했지만 작동하지 않습니다. 다음 코드가 작동하지 않는 이유는 astPipe가 Pipe _ _ IO a을 반환하고 파이프와 동시성 모두에서 i와 o가 모두 Consumer/Producer _ IO()을 반환하기 때문입니다. astPipe 수확량이 Maybe ByteString 인 것을 생각한 다음 출력을 Consumer으로 만들었습니다. Maybe ByteString을 소비하지만 여전히 을 반환하는 문제는 해결되지 않습니다.

나는 정말로 내가 해결책에 가깝다고 느낀다. 그러나 나는 그것을 아주 조용하게 할 수 없다. 이 파일에 스택을 실행하여 복제 할 수 있어야합니다.

#!/usr/bin/env stack 
-- stack --resolver lts-6.20 runghc --package pipes --package pipes-concurrency --package operational --package process-streaming 

{-# LANGUAGE OverloadedStrings, LambdaCase #-} 
{-# LANGUAGE ScopedTypeVariables #-} 
{-# LANGUAGE GADTs #-} 
module West.Asterisk where 

import System.Process.Streaming as PS 
import Control.Monad.Operational as Op 

import Pipes as P 
import Pipes.Concurrent as PC; 

import qualified Data.ByteString.Char8 as B 

import Control.Concurrent.Async 

import GHC.IO.Exception (ExitCode) 

data Version = Version String 
data Channels = Channels 

data AsteriskInstruction a where 
    Login :: AsteriskInstruction (Maybe Version) 
    CoreShowChannels :: AsteriskInstruction (Maybe Channels) 

type Asterisk a = Program AsteriskInstruction a 

runAsterisk :: forall a. Asterisk a -> IO a 
runAsterisk m = 
    let 

    runAsterisk' :: Producer B.ByteString {- TODO Response -} IO() -> Consumer B.ByteString IO() -> Asterisk a -> IO a 
    runAsterisk' i o m' = runEffect $ i >-> astPipe m' >-> o 
     where 
     astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a 
     astPipe k = 
      case Op.view m' of 

      Return a -> return a 

      Login :>>= k -> do 
       yield logincmd 
       resp <- await -- :: Response 
       let v = undefined resp :: Maybe Version 
       astPipe (k v) 

      CoreShowChannels :>>= k -> do 
       yield coreshowchannelscmd 
       resp <- await 
       let c = undefined resp :: Maybe Channels 
       astPipe (k c) 

    in do 
    withSpawn unbounded $ \(out1, in1) -> do 
     async $ asteriskManager (fromInput in1) (toOutput out1) 
     runAsterisk' (fromInput in1) (toOutput out1) m 

asteriskManager :: Producer B.ByteString IO() -> Consumer B.ByteString IO() -> IO ExitCode 
asteriskManager prod cons = do 
    let ssh = shell "nc someserver 5038" 
    execute (piped ssh) (foldOut (withConsumer cons) *> feedProducer prod *> exitCode) 


logincmd, coreshowchannelscmd :: B.ByteString 
logincmd = "action: login\nusername: username\nsecret: pass\nevents: off\n\n" 
coreshowchannelscmd = "action: coreshowchannels\n\n" 

오류 :

Blah.hs:38:45: 
    Couldn't match type ‘a’ with ‘()’ 
     ‘a’ is a rigid type variable bound by 
      the type signature for runAsterisk :: Asterisk a -> IO a 
      at Blah.hs:33:23 
    Expected type: Proxy() B.ByteString() B.ByteString IO() 
     Actual type: Pipe B.ByteString B.ByteString IO a 
    Relevant bindings include 
     astPipe :: Asterisk a -> Pipe B.ByteString B.ByteString IO a 
     (bound at Blah.hs:41:9) 
     m' :: Asterisk a (bound at Blah.hs:38:22) 
     runAsterisk' :: Producer B.ByteString IO() 
         -> Consumer B.ByteString IO() -> Asterisk a -> IO a 
     (bound at Blah.hs:38:5) 
     m :: Asterisk a (bound at Blah.hs:34:13) 
     runAsterisk :: Asterisk a -> IO a (bound at Blah.hs:34:1) 
    In the second argument of ‘(>->)’, namely ‘astPipe m'’ 
    In the first argument of ‘(>->)’, namely ‘i >-> astPipe m'’ 
+0

형식 서명을 수정할 수있는 것처럼 보입니다. 당신은'Consumer B.ByteString IO a' 대신에 타입 서명에'Consumer B.ByteString IO()'를 가지고 있습니다. 'runEffect'는 당신의 전반적인'Effect'가 리턴 한 것을 리턴하고, 그것이'Consumer B.ByteString IO()'이어야한다고 말했지만,'runEffect'는'IO a'를 리턴해야합니다. – bheklilr

+0

http://hackage.haskell.org/package/pipes-concurrency-2.0.6/docs/Pipes-Concurrent.html#v:fromInput이 명시 적으로 'Producer'를 반환하기 때문에 작동하지 않습니다. am()'.'toOutput '비슷합니다. 나는 그들을 어떻게 든 개조하려고 생각했다. 그러나 그것이 의미가 있는지 나는 확신하지 않는다. –

답변

2

Producer들과 ()를 반환 Consumer의 스스로 중지 할 수 있습니다. 반환 유형에서 다형성 인 ProducerConsumer은 그 자체로 결코 중단되지 않습니다.

케이스의 반환 유형을 통합하려면 fmap을 사용하여 Either의 다른 가지에 각각을 입력하십시오. 파이프 라인을 중단하는 성분 공개 될 Either에 일치

runAsterisk' :: Producer B.ByteString IO() 
      -> Consumer B.ByteString IO() 
      -> Asterisk a 
      -> IO (Either() a) 
runAsterisk' i o m' = runEffect $ fmap Left i >-> fmap Right (astPipe m') >-> fmap Left o 

패턴.

또한, 당신은 그 자체로 결코 멈추지 않는다 소비자에 Consumer a IO()를 변환하는 drain를 사용할 수 있습니다 원래 소비자 정지가 삭제됩니다 후

neverStop :: Consumer a IO() -> Consumer a IO r 
neverStop consumer = consumer *> drain 

모든 입력을 받았다.

+0

이것은 정확히 내가 바라는 바입니다. 보너스 : 프로그램이 제대로 종료되었는지 또는 서버가 예기치 않게 나를 멈추게하는지 알 수 있습니다. –