2015-01-31 4 views
4

Async.StartWithContinuations을 사용하여 시작된 비동기 워크 플로에서 Fsharpx 'Async.AwaitObservable을 사용하려고합니다. 어떤 이유로 워크 플로를 시작하는 데 사용 된 취소 토큰이 관찰 가능을 기다리는 동안 취소되지만 워크 플로의 다른 부분에서는 취소 토큰이 호출되지 않으면 취소 연속이 호출되지 않습니다. 그러나, 내가 use! __ = Async.OnCancel (interruption) 안에 넣으면 인터럽트 기능이 호출됩니다. 어떤 사람이 왜 이런 일이 일어나는지 명확하게 설명하고 최선의 방법은이 작업을 수행하고 연속 함수 중 하나가 항상 호출되는지 확인하십시오.Fsharpx Async.AwaitObservable이 취소 계속을 호출하지 않습니다.

open System 
open System.Reactive.Linq 
open FSharp.Control.Observable 
open System.Threading 

[<EntryPoint>] 
let main _ = 
    let cancellationCapability = new CancellationTokenSource() 

    let tick = Observable.Interval(TimeSpan.FromSeconds 1.0) 
    let test = async { 
     let! __ = Async.AwaitObservable tick 
     printfn "Got a thing." } 

    Async.StartWithContinuations(test, 
     (fun() -> printfn "Finished"), 
     (fun exn -> printfn "Error!"), 
     (fun exn -> printfn "Canceled!"), 
     cancellationCapability.Token) 

    Thread.Sleep 100 
    printfn "Cancelling..." 
    cancellationCapability.Cancel() 

    Console.ReadLine() |> ignore 
    0 // return an integer exit code 
+0

이 문제가 AwaitObservable의 FSharpx '정의 만 연속 함수 중 하나를 호출하는 것을 나에게 보인다 취소가 발생하기 전에 관찰 가능한 시퀀스에 다음 값 (또는 오류)이 있으면 true입니다. 또한 취소 연속을 호출하는 취소 토큰으로 콜백을 등록하고 결과 CancellationTokenRegistration을 시퀀스의 다음 요소에 배치해야합니다. 지금 이것을 구현할 방법을 찾고 있습니다. –

답변

2

Ghub의 Fsharpx 버전에 이미 수정 프로그램이 포함되어있는 것으로 보입니다. 그러나 NuGet (1.8.41)의 현재 버전은이 수정 사항을 포함하도록 업데이트되지 않았습니다. 변경 here을 참조하십시오.

편집 1 : GitHub의 코드에는 재생 의미가있는 Observables에도 문제가 있습니다. 나는 이것을 이렇게 고쳤지만 잘하면 거기에 깨끗한 해결책이있다. 좀 더 단순하게 만들 수있는 방법이 있는지 생각한 후에 PR을 제출할 것입니다.

/// Creates an asynchronous workflow that will be resumed when the 
/// specified observables produces a value. The workflow will return 
/// the value produced by the observable. 
static member AwaitObservable(observable : IObservable<'T1>) = 
    let removeObj : IDisposable option ref = ref None 
    let removeLock = new obj() 
    let setRemover r = 
     lock removeLock (fun() -> removeObj := Some r) 
    let remove() = 
     lock removeLock (fun() -> 
      match !removeObj with 
      | Some d -> removeObj := None 
         d.Dispose() 
      | None ->()) 
    synchronize (fun f -> 
    let workflow = 
     Async.FromContinuations((fun (cont,econt,ccont) -> 
      let rec finish cont value = 
       remove() 
       f (fun() -> cont value) 
      setRemover <| 
       observable.Subscribe 
        ({ new IObserver<_> with 
         member x.OnNext(v) = finish cont v 
         member x.OnError(e) = finish econt e 
         member x.OnCompleted() = 
          let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed." 
          finish ccont (new System.OperationCanceledException(msg)) }) 
      ())) 
    async { 
     let! cToken = Async.CancellationToken 
     let token : CancellationToken = cToken 
     #if NET40 
     use registration = token.Register(fun() -> remove()) 
     #else 
     use registration = token.Register((fun _ -> remove()), null) 
     #endif 
     return! workflow 
    }) 

    static member AwaitObservable(observable : IObservable<'T1>) = 
     let synchronize f = 
      let ctx = System.Threading.SynchronizationContext.Current 
      f (fun g -> 
       let nctx = System.Threading.SynchronizationContext.Current 
       if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null) 
       else g()) 

     let continued = ref false 
     let continuedLock = new obj() 
     let removeObj : IDisposable option ref = ref None 
     let removeLock = new obj() 
     let setRemover r = 
      lock removeLock (fun() -> removeObj := Some r) 
     let remove() = 
      lock removeLock (fun() -> 
       match !removeObj with 
       | Some d -> 
        removeObj := None 
        d.Dispose() 
       | None ->()) 
     synchronize (fun f -> 
     let workflow = 
      Async.FromContinuations((fun (cont,econt,ccont) -> 
       let rec finish cont value = 
        remove() 
        f (fun() -> lock continuedLock (fun() -> 
         if not !continued then 
          cont value 
          continued := true)) 
       let observer = 
        observable.Subscribe 
         ({ new IObserver<_> with 
          member __.OnNext(v) = finish cont v 
          member __.OnError(e) = finish econt e 
          member __.OnCompleted() = 
           let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed." 
           finish ccont (new System.OperationCanceledException(msg)) }) 
       lock continuedLock (fun() -> if not !continued then setRemover observer else observer.Dispose()) 
       ())) 
     async { 
      let! cToken = Async.CancellationToken 
      let token : CancellationToken = cToken 
      use __ = token.Register((fun _ -> remove()), null) 
      return! workflow 
     }) 

편집 2 : 뜨거운 관찰 문제에 대한 깔끔한 수정 ...

let AwaitObservable(observable : IObservable<'T>) = async { 
    let! token = Async.CancellationToken // capture the current cancellation token 
    return! Async.FromContinuations(fun (cont, econt, ccont) -> 
     // start a new mailbox processor which will await the result 
     Agent.Start((fun (mailbox : Agent<Choice<'T, exn, OperationCanceledException>>) -> 
      async { 
       // register a callback with the cancellation token which posts a cancellation message 
       #if NET40 
       use __ = token.Register((fun _ -> 
        mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled."))))) 
       #else 
       use __ = token.Register((fun _ -> 
        mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))), null) 
       #endif 

       // subscribe to the observable: if an error occurs post an error message and post the result otherwise 
       use __ = 
        observable.FirstAsync() 
         .Catch(fun exn -> mailbox.Post(Choice2Of3 exn) ; Observable.Empty()) 
         .Subscribe(fun result -> mailbox.Post(Choice1Of3 result)) 

       // wait for the first of these messages and call the appropriate continuation function 
       let! message = mailbox.Receive() 
       match message with 
       | Choice1Of3 reply -> cont reply 
       | Choice2Of3 exn -> econt exn 
       | Choice3Of3 exn -> ccont exn })) |> ignore) } 
+0

니스. 구독이 누에 트 버전에서 제대로 처리되지 않았기 때문에 그 연속이 불려 왔을 지 몰랐다. – scrwtp

+0

아니요, NuGet 버전이 콜백을 취소 토큰으로 등록하지 않았다고 생각합니다. –

2

나는 그것이 AwaitObservable의 구현 방법에 문제가 있다고 생각합니다. 그것을 고치는 행운을 비네.

async { 
    let! ct = Async.CancellationToken 
    let! __ = 
     Async.StartAsTask(Async.AwaitObservable tick, cancellationToken = ct) 
     |> Async.AwaitTask 
    printfn "Got a thing." 
} 

적합하지 않습니다,하지만 작품 :

는 클라이언트 측 코드에서 사용할 수있는 하나의 해결 방법은 작업에 AwaitObservable 포장되어 말했다.

+0

감사 : 이것은 지금 당장 트릭을 할 수 있습니다! 필자는 여기있는 곳에서 조금 벗어나기 때문에 그들이 무엇을하고 있는지 아는 사람이 적절한 수정을 찾을 때까지 유틸리티 라이브러리에서 그것을 숨길 수 있습니다. –