2017-03-29 4 views
1

아래에 설명 된 방법으로 couchbase 서버에서 대량 데이터를 가져옵니다. 내가 쿼리를RxJava와 couchbase에서 정렬을 사용하는 방법?

"SELECT meta().id as id FROM bucket" 

을 통과 할 때

bucket.async() 
      .query(N1qlQuery.simple(query)) 
      .doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t))) 
      .flatMap(AsyncN1qlQueryResult::rows) 
      .flatMap(row -> 
      bucket.async(). 
      get(row.value().getString("id"))) 
      .map(JsonDocument::content). 
      toList() 
      .toBlocking() 
      .single(); 

이 코드

은 잘 작동하지만

"SELECT meta().id as id FROM bucket order by id ASC" 

결과 같은 것을 사용할 때 나는 정렬되지 않습니다 얻고있다. 그러나 쿼리 콘솔에서 동일한 쿼리를 실행하면 예상대로 결과가 나타납니다. 그게 내가 rxJava에서 뭔가 잘못하고 있다고 믿게 만듭니다. 이 문제를 해결하도록 도와주세요.

답변

5

동시 스트림을 적용하지만 순서는 유지하지 않는 flatMap() 연산자 때문에 순서가 손실됩니다. A의 종료됩니다 다음

bucket.async(). 
     get(row.value().getString("id"))) 

가져 오기 작업의 각을 : 당신은 당신이 작성하고 각 행에 대해 의미, 각 onNext()에 대한 새로운 Observable에 가입되어 flatMap()를 적용하면, 병렬이 줄을 실행하고
다른 시간에 가져오고 가져온 내용은 ​​순서가 지정되지 않은 상태로 방출됩니다.

주문을 유지하려고하지만 병렬 처리가 손실되지 않도록 신경 쓰는 경우 concatMap()을 사용하면 활성 스트림이 1 개만 유지되며 각 가져 오기 작업을 순서대로 구독해야합니다.

병렬 처리가 필요하거나 필요하면 concatMapEager()을 사용하여 생성 된 Observable을 각각 병렬로 실행하지만 순서대로 항목을 방출해야합니다.

+2

동의합니다. concatMapEager는 제가 생각하기에 좋은 방법입니다. 뷰 & include docs에 대해서는 이미이 작업을 수행합니다. https://github.com/couchbase/couchbase-java-client/blob/master/src/main/java/com/couchbase/client/java/view/ViewQueryResponseMapper를 참조하십시오. .java # L220 당신이 세부 사항에 관심이 있다면. – daschl