2013-08-27 1 views
14

나는 간단한 생산자/소비자 모델을 가지고 있는데, 소비자가 어떤 상태를 생산자에게 돌려주고 싶어한다고 말한다. 예를 들어, 하류로 흐르는 객체를 파일에 기록하고자하는 객체로, 상류 객체를 파일에 기록 된 위치 (예 : 오프셋)로 나타내는 토큰으로 보자.관용적 인 양방향 파이프 손실이없는 다운 스트림 상태의 파이프

이 두 과정은이 같은 간단한

{-# LANGUAGE GeneralizedNewtypeDeriving #-} 

import Pipes 
import Pipes.Core 
import Control.Monad.Trans.State  
import Control.Monad 

newtype Object = Obj Int 
       deriving (Show) 

newtype ObjectId = ObjId Int 
       deriving (Show, Num) 

writeObjects :: Proxy ObjectId Object() X IO r 
writeObjects = evalStateT (forever go) (ObjId 0) 
    where go = do i <- get 
       obj <- lift $ request i 
       lift $ lift $ putStrLn $ "Wrote "++show obj 
       modify (+1) 

produceObjects :: [Object] -> Proxy X() ObjectId Object IO() 
produceObjects = go 
    where go [] = return() 
     go (obj:rest) = do 
      lift $ putStrLn $ "Producing "++show obj 
      objId <- respond obj 
      lift $ putStrLn $ "Object "++show obj++" has ID "++show objId 
      go rest 

objects = [ Obj i | i <- [0..10] ] 

, 나는 그것들을 구성하는 방법에 대한 추론 어려움의 공정한 조금 했어 수 있습니다 ( pipes-4.0와) 같이 보일 수 있습니다. 이상적으로, 우리는 상류 초기 ObjId 0을 보낸 데 request에 차단하여 다음과 같은 컨트롤의 푸시 기반 흐름,

  1. writeObjects 시작 싶어.
  2. produceObjects
  3. writeObjects 객체를 기록하고, 그 상태를 증가 하류 제 객체 Obj 0를 보내고 request에 대기 이번에 ObjId 0
  4. produceObjectsproduceObjects 복귀 상류 ObjId 1
  5. respond를 전송하는 단계로 진행 (2) 제 2의 대상과 함께, Obj 1

나의 최초의 시도는 (문제가있다 곳이 보인다) 푸시 기반 조성물

main = void $ run $ produceObjects objects >>~ const writeObjects 

참고 다음 const의 사용은 호환되지 않는 종류를 해결한다. 그러나이 경우, 우리는

Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 1 
Producing Obj 1 
... 

끌어 오기 기반의 접근 방식,

main = void $ run $ const (produceObjects objects) +>> writeObjects 

이 시간이 Obj 0을 떨어 뜨리고, 비슷한 문제를 겪고, ObjId 0 먹을됩니다 것을 찾을 수 있습니다.

원하는 방식으로이 조각들을 어떻게 구성 할 수 있습니까?

답변

14

사용할 구성의 선택은 전체 프로세스를 시작해야하는 구성 요소에 따라 다릅니다. 다운 스트림 파이프가 프로세스를 시작하게하려면 풀 기반 컴포지션 (예 : (>+>)/(+>>))을 사용하려고하지만 업스트림 파이프가 프로세스를 시작하게하려면 푸시 기반 구성 (즉, (>>~)/(>~>))을 사용해야합니다. . 유형 오류는 코드에 논리적 오류가 있음을 경고합니다. 즉, 어떤 구성 요소가 먼저 프로세스를 시작했는지 명확히 알지 못했습니다.

사용자의 설명에 따르면 제어 흐름이 produceObjects부터 시작되도록 푸시 기반 구성을 사용하려는 것이 분명합니다. 푸시 기반 구성을 사용하면 구성 운영자 유형이 코드 수정 방법에 대해 알아야 할 모든 것을 알려줍니다.나는 그것의 유형을 가지고 컴포지션 체인에 전문 수 있습니다 : 이미 눈치

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types 
(>>~) :: Server ObjectId Object IO() 
     -> (Object -> Client ObjectId Object IO()) 
     -> Effect IO() 

, 당신은 (>>~)를 사용하려고 할 때 당신이있어 유형의 오류가 당신에게 유형 Object의 인수 누락 된 사실을 말하여 writeObjects 기능. 이렇게하면 처음 Object (초기 인수를 통해)을 수신하기 전에 writeObjects에 코드를 실행할 수 없도록 정적으로 적용됩니다.

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj0 = evalStateT (go obj0) (ObjId 0) 
    where go obj = do i <- get 
        lift $ lift $ putStrLn $ "Wrote "++ show obj 
        modify (+1) 
        obj' <- lift $ request i 
        go obj' 

를이 다음 올바른 동작을 제공합니다 :이 요구 사항은 두 개의 파이프 중 하나가 걸리는 그 이유

>>> run $ produceObjects objects >>~ writeObjects 
Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 0 
Producing Obj 1 
Wrote Obj 1 
Object Obj 1 has ID ObjId 1 
Producing Obj 2 
Wrote Obj 2 
Object Obj 2 has ID ObjId 2 
Producing Obj 3 
Wrote Obj 3 
Object Obj 3 has ID ObjId 3 
Producing Obj 4 
Wrote Obj 4 
Object Obj 4 has ID ObjId 4 
Producing Obj 5 
Wrote Obj 5 
Object Obj 5 has ID ObjId 5 
Producing Obj 6 
Wrote Obj 6 
Object Obj 6 has ID ObjId 6 
Producing Obj 7 
Wrote Obj 7 
Object Obj 7 has ID ObjId 7 
Producing Obj 8 
Wrote Obj 8 
Object Obj 8 has ID ObjId 8 
Producing Obj 9 
Wrote Obj 9 
Object Obj 9 has ID ObjId 9 
Producing Obj 10 
Wrote Obj 10 
Object Obj 10 has ID ObjId 10 

당신은 궁금 할을

이 솔루션은 다음과 같이 당신의 writeObjects 기능을 재 작성하는 것입니다 초기의 논증은 그것이 범주법이 요구하는 것과 같은 추상적 인 정당화 이외에는 의미가있다. 일반 영어 설명은 writeObjects이 첫 번째 메시지 인 request에 도달하기 전에 두 파이프 사이에 첫 번째로 전송 된 버퍼가 Object 사이에 있어야한다는 것입니다. 이 접근법은 많은 문제가있는 동작과 버그가있는 케이스를 생성하지만 가장 중요한 문제는 파이프 구성이 더 이상 연관성이없고 효과 순서가 사물을 구성한 순서에 따라 변경된다는 것입니다.

양방향 파이프 작성 연산자의 좋은 점은 구성 요소가 "활성"(즉 제어 시작)인지 "수동"(즉, 입력 대기 중)인지 여부를 항상 추론 할 수 있도록하는 것입니다. 유형을 연구함으로써. 구성에 특정 파이프 (예 : writeObjects)가 인수를 사용해야한다고 표시되면 수동입니다. 인수가 없으면 (예 : produceObjects) 활성화되어 제어를 시작합니다. 따라서 컴포지션은 파이프 라인 내의 활성 파이프 (초기 인수를 사용하지 않는 파이프)와 제어를 시작하는 파이프를 하나씩 보유해야합니다.

4

'const'는 데이터를 삭제하는 곳입니다. 다음과 같이 모든 데이터를 얻기 위해, 당신은 아마 푸시 기반의 워크 플로우를 수행 할 : 우리는 메일 링리스트에이 문제를 논의했습니다

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj = go 0 obj 
    where 
    go objid obj = do 
     lift $ putStrLn $ "Wrote "++show obj 
     obj' <- request objid 
     go (objid + 1) obj' 

-- produceObjects as before 

main = void $ run $ produceObjects objects >>~ writeObjects 
2

을,하지만 난 여기에 내가 던질 거라고 생각 관심있는 사람들에게도 좋습니다.

당신의 문제는 당신이 서로에게 가치를 낼 준비가되어있는 두개의 코 루틴을 가지고 있다는 것입니다. 값을 산출하기 위해 상대방의 입력이 필요하지 않습니다. 그럼 누가 먼저 가야 하죠? 그럼 당신은 스스로 말했다 : 전송 한 요청을에 차단하여

writeObjects 시작 초기 ObjId 0 상류

다음 좋아, 우리가 ObjId 신호 이전에 대한 produceObjects 그래서 대기 지연 할 필요가 의미 해당 객체를 뱉어 낸다. (비록 분명히 이 필요하지 않더라도은 ID를 말했다).

프록시 내부로 들어가기 때문에, 이번에는 매우 신중하게 설명 할 필요가없는 마법의 주문이 있습니다.기본적인 아이디어는 당신이 그것을 필요로하기 전에 다음 입력을 필요로 할 때 입력을 적용하지만, 당신이 (당신이 아직 하나를 필요로하지 않더라도) 새로운 입력이 필요한 것처럼 다음 척하는 것입니다

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r 
delayD p0 b' = case p0 of 
    Request a' f -> Request a' (go . f) 
    Respond b g -> Respond b (delayD (g b')) 
    M m   -> M (liftM go m) 
    Pure r  -> Pure r 
    where 
    go p = delayD p b' 

지금, 당신은 produceObjects objects 대신 const에서이를 사용할 수 있으며, 두 번째 시도가 원하는대로 작동합니다

delayD (produceObjects objects) +>> writeObjects 

우리는 표준 파이프에서 그 장점에 포함이 레퍼토리 있는지 확인하기 위해 메일 링리스트에 delayD을 논의하고 있습니다.