2017-04-06 6 views
-1

저는 반응 형 프로그래밍이 새로 도입되었지만 호출 및 SMS 목록을 수집하고 단일 목록으로 결합하기 위해 rxjava를 적용하려고 시도했습니다. 아이디어는 순차적 인 항목이 나오는 것과 동시에 커서를 반복하는 것입니다. 이 후 모든 항목은 단일 목록으로 수집됩니다.유동적 인 2 명의 게시자의 결과 수집

그러나 doOnComplete에는 모든 데이터가 수집되지는 않습니다 (약 150 개의 항목이 있으며 ~ 25 개만 추가됨). 나는이 문제가 Flowable.concat이 모든 항목을 기다리지 않는다고 생각하지만 그것은 단지 제안 일뿐입니다.

public void readLatestActivity() { 
    Cursor callsCursor = getCallsCursor(); 
    Cursor smsCursor = getSmsCursor(); 

    if (callsCursor == null || smsCursor == null) { 
     return; 
    } 

    SortedSet<MActivity> activities = new TreeSet<>((left, right) -> left.getReceiveTime() < right.getReceiveTime() ? 1 : 0); 

    Flowable.concat(getCallsPublisher(callsCursor), getSmsPublisher(smsCursor)) 
      .subscribeOn(Schedulers.io()) 
      .doOnNext(new Consumer<MActivity>() { 
       @Override 
       public void accept(@NonNull MActivity mActivity) throws Exception { 
        if (mActivity != null){ 
         activities.add(mActivity); 
        } 
       } 
      }) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .doOnComplete(new Action() { 
       @Override 
       public void run() throws Exception { 
        Timber.d("doOnComplete"); 
        callsCursor.close(); 
        smsCursor.close(); 
        mBus.post(new EActivities(activities)); //<--- here I'm getting ~25 items 
       } 
      }) 
      .subscribe(); 
} 

private Publisher<MActivity> getCallsPublisher(Cursor callsCursor) { 
    return Flowable.fromIterable(RxCursorIterable.from(callsCursor)) 
      .take(callsCursor.getCount() < LIMIT ? callsCursor.getCount() : LIMIT) 
      .subscribeOn(Schedulers.computation()) 
      .parallel() 
      .runOn(Schedulers.computation()) 
      .map((Function<Cursor, MActivity>) this::readCallFromCursor) 
      .sequential() 
      .observeOn(AndroidSchedulers.mainThread()); 

} 

private Publisher<MActivity> getSmsPublisher(Cursor smsCursor) { 
    return Flowable.fromIterable(RxCursorIterable.from(smsCursor)) 
      .take(smsCursor.getCount() < LIMIT ? smsCursor.getCount() : LIMIT) 
      .subscribeOn(Schedulers.computation()) 
      .parallel() 
      .runOn(Schedulers.computation()) 
      .map((Function<Cursor, MActivity>) this::readSmsFromCursor) 
      .sequential() 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

sms를 읽는 방법과 통화가 정상적으로 작동합니다. 나는 문제가 다른 곳에 있다고 믿는다.

또한 나는 문제가 TreeSet을 만들 때 사용하는 비교에있다 생각 cursor 행위

public class RxCursorIterable implements Iterable<Cursor> { 

private Cursor mIterableCursor; 

public RxCursorIterable(Cursor c) { 
    mIterableCursor = c; 
} 

public static RxCursorIterable from(Cursor c) { 
    return new RxCursorIterable(c); 
} 

@Override 
public Iterator<Cursor> iterator() { 
    return RxCursorIterator.from(mIterableCursor); 
} 

private static class RxCursorIterator implements Iterator<Cursor> { 

    private final Cursor mCursor; 

    public RxCursorIterator(Cursor cursor) { 
     mCursor = cursor; 
    } 

    public static Iterator<Cursor> from(Cursor cursor) { 
     return new RxCursorIterator(cursor); 
    } 

    @Override 
    public boolean hasNext() { 
     return !mCursor.isClosed() && mCursor.moveToNext(); 
    } 

    @Override 
    public Cursor next() { 
     return mCursor; 
    } 
} 
} 

답변

1

Iterable처럼 만드는 클래스를 발견했습니다. left.getReceiveTime() > right.getReceiveTime() 일 때 -1을 반환하지 않습니다. 즉, 일부 활동이 중복으로 잘못 간주됩니다.

나는이처럼 getCallsPublisher 코드를 단순화 제안 :

return Flowable.generate(getCallsCursor(), 
     (cursor, emitter) -> { 
      if (!cursor.isClosed() && cursor.moveToNext()) 
       emitter.onNext(readCallFromCursor(cursor)); 
      else 
       emitter.onComplete(); 
     }, Cursor::close) 
    .take(LIMIT); 

그리고는 RxCursorIterable 치우는.

+0

답변 해 주셔서 감사합니다. 당신이 사용하고있는 함수를 이해하고 싶기 때문에 lambda없이이 스 니펫을 작성해 주시겠습니까? – AnZ

+1

@AnZ이 함수를 사용하고 있습니다 :'generate (initialState, generator, disposeState)'http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#generate(java.util.concurrent) .Callable, % 20io.reactivex.functions.BiConsumer, % 20io.reactivex.functions.Consumer) –