7

관찰 대상의 순환 체인에 문제가 있습니다.RxJS : 관찰 대상과 단일 관찰자의 재귀 목록

저는 현재 버전 1.0.10621 인 RxJS와 함께 작업하고 있으며 Rx for jQuery와 함께 가장 기본적인 Rx 기능을 포함하고 있습니다.

내 문제의 예제 시나리오를 소개해 드리겠습니다. 특정 키워드가 포함 된 짹짹/업데이트에 대해 Twitter search API (JSON 응답)을 폴링합니다. 또한 응답에는 후속 요청을 생성하는 데 사용해야하는 "refresh_url"도 포함됩니다. 해당 후속 요청에 대한 응답에는 새로운 refresh_url 등이 다시 포함됩니다.

Rx.jQuery를 사용하면 Twitter 검색 API에서 관찰 가능한 이벤트를 호출하여 onNext를 생성 한 다음 완료 할 수 있습니다. 지금까지 시도한 것은 onNext 핸들러가 refresh_url을 기억하고 onCompleted 핸들러에서 사용하여 다음 관찰을 위해 새로운 관찰 가능 객체와 해당 관찰자를 생성하는 것입니다. 이렇게하면, 관찰 가능 + 관찰자 쌍 중 하나가 다른 관찰자와 무한히 뒤 따른다.

이 방법의 문제점은 다음과 같습니다 그들의 전임자가 아직 처리되지 않은 경우

  1. 추시 관찰/관찰자가 이미 살아있다.

  2. 실제로 살아있는 관찰자에 대한 유효한 참조를 유지하기 위해 많은 부기가 이루어져야합니다. 실제로는 2 개가 될 수 있습니다. (하나는 oncompleted에, 다른 하나는 라이프 사이클에있는 다른 곳) 물론이 참조는 관찰자의 구독 취소/삭제에 필요합니다. 부기에 대한 대안은 나의 예에서와 같이 "여전히 실행 중입니까?"- 부울 값으로 부작용을 구현하는 것입니다.

예제 코드는 :

  running = true; 
      twitterUrl = "http://search.twitter.com/search.json"; 
      twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); 
      twitterMaxId = 0; //actually twitter ignores its since_id parameter 

      newTweetObserver = function() { 
       return Rx.Observer.create(
         function (tweet) { 
          if (tweet.id > twitterMaxId) { 
           twitterMaxId = tweet.id; 
           displayTweet(tweet); 
          } 
         } 
        ); 
      } 

      createTwitterObserver = function() { 
       twitterObserver = Rx.Observer.create(
         function (response) { 
          if (response.textStatus == "success") { 
           var data = response.data; 
           if (data.error == undefined) { 
            twitterQuery = data.refresh_url; 
            var tweetObservable; 
            tweetObservable = Rx.Observable.fromArray(data.results.reverse()); 
            tweetObservable.subscribe(newTweetObserver()); 
           } 
          } 
         }, 
         function(error) { alert(error); }, 
         function() { 
          //create and listen to new observer that includes a delay 
          if (running) { 
           twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); 
           twitterObservable.subscribe(createTwitterObserver()); 
          } 
         } 
        ); 
       return twitterObserver; 
      } 
      twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); 
      twitterObservable.subscribe(createTwitterObserver()); 

는 트윗에 요청에서 관찰 가능한/관찰자의 더블 레이어에 속지 마십시오. 필자의 예제는 주로 첫 번째 레이어 인 Twitter에 데이터를 요청하는 것과 관련이 있습니다. 이 문제를 푸는 경우 두 번째 레이어 (트윗으로 응답 변환)가 첫 번째 레이어와 하나가 될 수 있습니다. 그러나 나는 그것이 완전히 다른 것이라고 생각한다. 지금은.

Erik Meijer가 Expand 연산자를 내게 지적했으며 (아래 예제 참조) Join patterns을 제안했습니다.

var ys = Observable.Expand 
(new[]{0}.ToObservable() // initial sequence 
        , i => (i == 10 ? Observable.Empty<int>() // terminate 
     : new[]{i+1}.ToObservable() // recurse 
) 
); 

ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

이 값은 LINQPad에 복사 가능해야합니다. 그것은 싱글 톤 관찰 가능을 가정하고 최종 관찰자 한 명을 생성합니다.

제 질문은 다음과 같습니다. 어떻게 RxJS에서 확장 트릭을 할 수 있습니까?

EDIT :
확장 연산자는 아마도 this thread과 같이 구현 될 수 있습니다. 하지만 하나는 generators이 필요합니다 (JS < 1.6 만).
불행히도 RxJS 2.0.20304-beta은 Extend 메서드를 구현하지 않습니다.

+2

이 문제의 해결 방법은 실제로 [확장 연산자] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9)은 RxJS 버전 2.0.20304-beta까지 아직 구현되지 않았습니다. – derabbink

+1

확장이 필요합니다. 이 게시물은 오래되었지만 Rx의 향후 버전에서 필요한 운영자를 자신의 Rx 버전으로 이식하는 것이 가능합니다. 몇 가지 조작이 필요할 수 있지만 코드베이스는 이전 v1에서 크게 변경되지 않았습니다. –

+0

또한 가능한 경우 Rx를 업그레이드하는 것이 좋습니다. 최신 버전에는 많은 버그 수정 및 개선 사항이 있습니다. –

답변

4

그래서 나는 당신이했던 것과 약간 다른 문제를 풀려고하고 좀 더 쉽게 해결할 수있는 자유를 시도하려고합니다.

그래서 난 당신이 다음에 시도되는 알 수없는 한 것은

  • 현재 옵저버 onNext,
  • 일단
  • 목록을 트윗받은 다음 URL을 포함하는 첫번째 트윗 목록을 얻고 얻을 설정 단계 트윗 다음 세트는
    • 는이 작업을 수행 무기한

또는 사용자 작업이 있습니다 (더 가져 오기/아래로 스크롤). 어느 쪽이든, 그것은 정말로 같은 문제입니다. 나는 당신의 문제를 잘못 읽고있을 수도 있습니다. 여기에 그 대답이 있습니다.

function getMyTweets(headUrl) { 
    return Rx.Observable.create(function(observer) { 

     innerRequest(headUrl); 
     function innerRequest(url) { 
      var next = ''; 

      // Some magic get ajax function 
      Rx.get(url).subscribe(function(res) { 
       observer.onNext(res); 
       next = res.refresh_url; 
      }, 
      function() { 
       // Some sweet handling code 
       // Perhaps get head? 
      }, 
      function() { 
       innerRequest(next); 
      }); 
     } 
    }); 
} 

이것은 사용자가 요청한 답변이 아닐 수도 있습니다. 그렇지 않다면, 미안 해요!


편집 : 코드를 살펴본 후 결과를 배열로 가져 와서 관찰하고 싶은 것처럼 보입니다.

// From the results perform a select then a merge (if ordering does not matter). 
getMyTweets('url') 
    .selectMany(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }); 

// Ensures ordering 
getMyTweets('url') 
    .select(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }) 
    .concat();