Ghub의 Fsharpx 버전에 이미 수정 프로그램이 포함되어있는 것으로 보입니다. 그러나 NuGet (1.8.41)의 현재 버전은이 수정 사항을 포함하도록 업데이트되지 않았습니다. 변경 here을 참조하십시오.
편집 1 : GitHub의 코드에는 재생 의미가있는 Observables에도 문제가 있습니다. 나는 이것을 이렇게 고쳤지만 잘하면 거기에 깨끗한 해결책이있다. 좀 더 단순하게 만들 수있는 방법이 있는지 생각한 후에 PR을 제출할 것입니다.
/// Creates an asynchronous workflow that will be resumed when the
/// specified observables produces a value. The workflow will return
/// the value produced by the observable.
static member AwaitObservable(observable : IObservable<'T1>) =
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun() -> removeObj := Some r)
let remove() =
lock removeLock (fun() ->
match !removeObj with
| Some d -> removeObj := None
d.Dispose()
| None ->())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun() -> cont value)
setRemover <|
observable.Subscribe
({ new IObserver<_> with
member x.OnNext(v) = finish cont v
member x.OnError(e) = finish econt e
member x.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
()))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
#if NET40
use registration = token.Register(fun() -> remove())
#else
use registration = token.Register((fun _ -> remove()), null)
#endif
return! workflow
})
static member AwaitObservable(observable : IObservable<'T1>) =
let synchronize f =
let ctx = System.Threading.SynchronizationContext.Current
f (fun g ->
let nctx = System.Threading.SynchronizationContext.Current
if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null)
else g())
let continued = ref false
let continuedLock = new obj()
let removeObj : IDisposable option ref = ref None
let removeLock = new obj()
let setRemover r =
lock removeLock (fun() -> removeObj := Some r)
let remove() =
lock removeLock (fun() ->
match !removeObj with
| Some d ->
removeObj := None
d.Dispose()
| None ->())
synchronize (fun f ->
let workflow =
Async.FromContinuations((fun (cont,econt,ccont) ->
let rec finish cont value =
remove()
f (fun() -> lock continuedLock (fun() ->
if not !continued then
cont value
continued := true))
let observer =
observable.Subscribe
({ new IObserver<_> with
member __.OnNext(v) = finish cont v
member __.OnError(e) = finish econt e
member __.OnCompleted() =
let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
finish ccont (new System.OperationCanceledException(msg)) })
lock continuedLock (fun() -> if not !continued then setRemover observer else observer.Dispose())
()))
async {
let! cToken = Async.CancellationToken
let token : CancellationToken = cToken
use __ = token.Register((fun _ -> remove()), null)
return! workflow
})
편집 2 : 뜨거운 관찰 문제에 대한 깔끔한 수정 ...
let AwaitObservable(observable : IObservable<'T>) = async {
let! token = Async.CancellationToken // capture the current cancellation token
return! Async.FromContinuations(fun (cont, econt, ccont) ->
// start a new mailbox processor which will await the result
Agent.Start((fun (mailbox : Agent<Choice<'T, exn, OperationCanceledException>>) ->
async {
// register a callback with the cancellation token which posts a cancellation message
#if NET40
use __ = token.Register((fun _ ->
mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))))
#else
use __ = token.Register((fun _ ->
mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))), null)
#endif
// subscribe to the observable: if an error occurs post an error message and post the result otherwise
use __ =
observable.FirstAsync()
.Catch(fun exn -> mailbox.Post(Choice2Of3 exn) ; Observable.Empty())
.Subscribe(fun result -> mailbox.Post(Choice1Of3 result))
// wait for the first of these messages and call the appropriate continuation function
let! message = mailbox.Receive()
match message with
| Choice1Of3 reply -> cont reply
| Choice2Of3 exn -> econt exn
| Choice3Of3 exn -> ccont exn })) |> ignore) }
이 문제가 AwaitObservable의 FSharpx '정의 만 연속 함수 중 하나를 호출하는 것을 나에게 보인다 취소가 발생하기 전에 관찰 가능한 시퀀스에 다음 값 (또는 오류)이 있으면 true입니다. 또한 취소 연속을 호출하는 취소 토큰으로 콜백을 등록하고 결과 CancellationTokenRegistration을 시퀀스의 다음 요소에 배치해야합니다. 지금 이것을 구현할 방법을 찾고 있습니다. –