2017-10-20 4 views
1

RxJava를 사용하기 위해 내 (안드로이드) 애플리케이션의 일부 로직을 변환하려고 합니다만, 좀 더 진보 된 로직을 수행하는 방법을 찾기 위해 고심하고 있습니다.RxJava로 기존 데이터 업데이트 및 유지하기

사용 사례는 다음과 같습니다. 나는 사용자에게 일종의 피드를 보여주고 싶습니다. 피드는 다양한 출처의 항목으로 구성됩니다 (예 : 메시지, 기사 등. API 제한으로 인해 앱 자체는 개별 소스를 모아 하나의 목록에 표시해야합니다.

class FeedItem { 
    Type feedItem; // Type of the item, e.g. article, message, etc. 
    ... 
} 

현재 공급이 별도의 스레드를 기반으로하고, UI 피드가 업데이트되었을 때 리스너를 사용하여 통지 : 다음과 같이 피드에서

예를 들어

, 가정 내 항목은 다음과 같습니다. 어떻게 완료했는지 알기 위해 아래에 (가짜) 일부 Java 코드 (스레딩 및 기타 관리 코드는 생략 함)가 있습니다.

class FeedProducer { 
    List<FeedItem> currentData = new ArrayList(); 

    public void refreshData() { 
     for (FeedSource source: getFeedSources()) { 
      Type sourceType = source.getType(); 
      // Remove existing items 
      currentData.removeIf(item -> item.feedItem.equals(sourceType)); 
      List<FeedItem> newItems = source.produceItems(); 
      // Add the new items 
      currentData.addAll(newItems); 
      // Notify the UI things have changed 
      notifyDataChanged(currentData); 
     } 
     // Notify the UI we are done loading 
     notifyLoadingComplete(); 
    } 
} 

이 방법 refreshData()은 사용자가 데이터를 새로 고침 할 때마다 호출됩니다. 이렇게하면 일부 소스 만 업데이트 할 수 있지만 다른 소스는 그대로 유지할 수 있습니다 (예 : getFeedSources()의 반환 값 변경).

이러한 소스는 앱의 다른 부분에서도 개별적으로 사용됩니다. 나는 그들을 Observables로 바꾸었다. 이로 인해 작업이 훨씬 쉬워졌습니다. 데이터베이스가 변경되면 변경 사항은 Observable에 의해 UI로 푸시됩니다.

내 질문은 어떻게 이러한 Observable 소스를 하나의 Observable로 병합 할 수 있습니까? 그러나 이전 결과의 "글로벌"상태가있는 경우에 어떻게됩니까? 다양한 결합 연산자를 살펴 보았지만 필요한 항목을 찾지 못했습니다. RxJava를 처음 접했을 때 나는 명백한 것을 간과 할 경우 사과드립니다.

답변

1

나이브 옵션하고자 할 때 당신은 새로운 데이터를 요청하는 호출자 마지막 목록을 저장하고 매개 변수로 제공 :

public class ReactiveMultipleSources { 

    // region Classes 
    public enum SourceType { 
     TYPE_ARTICLE, 
     TYPE_MESSAGE, 
     TYPE_VIDEO 
    } 

    public static class Feed { 
     private SourceType sourceType; 
     private String content; 

     Feed(SourceType sourceType, String content) { 
      this.sourceType = sourceType; 
      this.content = content; 
     } 

     SourceType getSourceType() { 
      return sourceType; 
     } 
    } 
    // endregion 

    public static void main(String[] args) throws InterruptedException { 
     final List<Feed>[] currentList = new List[]{new ArrayList()}; 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 
     System.out.println(); 

     // Simulate refresh 
     refreshContent(currentList[0]) 
       .subscribe(feeds -> { 
        currentList[0] = feeds; 

        for (int i = 0; i < currentList[0].size(); i++) { 
         System.out.println(currentList[0].get(i).content); 
        } 
       }); 
     Thread.sleep(2000); 


    } 

    private static Observable<List<Feed>> refreshContent(@NotNull List<Feed> currentFeed) { 
     return Observable.fromIterable(getSourceTypes()) 
       .observeOn(Schedulers.io()) 
       // Get List<Feed> forEach sourceType 
       .concatMap(ReactiveMultipleSources::getFeedItemsBySourceType) 
       .observeOn(Schedulers.computation()) 
       // Get list of "List of Feed for sourceType", = List<List<Feed>> 
       .toList() 
       .map(lists -> { 
        for (List<Feed> list : lists) { 
         SourceType sourceType = list.get(0).getSourceType(); 
         // Remove items of currentFeed whose sourceType has new List<Feed> 
         currentFeed.removeIf(temp -> temp.getSourceType() == sourceType); 
         // Add new items 
         currentFeed.addAll(list); 
        } 
        return currentFeed; 
       }) 
       .toObservable(); 
    } 

    // region Helper 
    private static List<SourceType> getSourceTypes() { 
     return new ArrayList<>(Arrays.asList(SourceType.values())); 
    } 

    private static Observable<List<Feed>> getFeedItemsBySourceType(SourceType sourceType) { 
     String content; 
     if (sourceType == SourceType.TYPE_ARTICLE) 
      content = "article "; 
     else if (sourceType == SourceType.TYPE_MESSAGE) 
      content = "message "; 
     else if (sourceType == SourceType.TYPE_VIDEO) 
      content = "video "; 
     else 
      content = "article "; 

     Feed feed1 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed2 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed3 = new Feed(sourceType, content + createRandomInt()); 
     Feed feed4 = new Feed(sourceType, content + createRandomInt()); 

     return Observable.just(Arrays.asList(feed1, feed2, feed3, feed4)); 
    } 

    // For simulating different items each time List<Feed> is required 
    private static int createRandomInt() { 
     return ThreadLocalRandom.current().nextInt(0, 21); 
    } 
    // endregion 
} 

예를 출력 :

article 19 
article 15 
article 18 
article 18 
message 3 
message 2 
message 9 
message 1 
video 19 
video 17 
video 18 
video 11 

article 0 
article 4 
article 18 
article 15 
message 11 
message 16 
message 16 
message 4 
video 1 
video 7 
video 20 
video 2 
1

[0, 1], [10, 11][20, 21]을 반환하는 개별 작업이있는 경우 단일 목록으로 병합하고 싶습니다. 이 경우 zip 작업을 사용할 수 있습니다.

public class TestRx { 
    public static void main(String[] args) { 
     // some individual observables. 
     Observable<List<Integer>> observable1 = Observable.just(Arrays.asList(0, 1)); 
     Observable<List<Integer>> observable2 = Observable.just(Arrays.asList(10, 11)); 
     Observable<List<Integer>> observable3 = Observable.just(Arrays.asList(20, 21)); 

     Observable.zip(observable1, observable2, observable3, 
       new Func3<List<Integer>, List<Integer>, List<Integer>, List<Integer>>() { 
        @Override 
        public List<Integer> call(List<Integer> list1, List<Integer> list2, List<Integer> list3) { 
         // TODO: Remove existing items 

         // merge all lists 
         List<Integer> mergedList = new ArrayList<>(); 
         mergedList.addAll(list1); 
         mergedList.addAll(list2); 
         mergedList.addAll(list3); 
         return mergedList; 
        } 
       }) 
       .subscribe(new Observer<List<Integer>>() { 
        @Override 
        public void onNext(List<Integer> mergedList) { 
         System.out.println(mergedList); 
         // TODO: notifyDataChanged(mergedList) 
        } 

        @Override 
        public void onError(Throwable throwable) { 
         System.out.println(throwable.toString()); 
         // TODO: handle exceptions 
        } 

        @Override 
        public void onCompleted() { 
         // TODO: notifyLoadingComplete() 
        } 
       }); 
    } 
} 

결과적으로 [0, 1, 10, 11, 20, 21]과 같이 인쇄됩니다.