저는 반응 형 프로그래밍이 새로 도입되었지만 호출 및 SMS 목록을 수집하고 단일 목록으로 결합하기 위해 rxjava를 적용하려고 시도했습니다. 아이디어는 순차적 인 항목이 나오는 것과 동시에 커서를 반복하는 것입니다. 이 후 모든 항목은 단일 목록으로 수집됩니다.유동적 인 2 명의 게시자의 결과 수집
그러나 doOnComplete
에는 모든 데이터가 수집되지는 않습니다 (약 150 개의 항목이 있으며 ~ 25 개만 추가됨). 나는이 문제가 Flowable.concat
이 모든 항목을 기다리지 않는다고 생각하지만 그것은 단지 제안 일뿐입니다.
public void readLatestActivity() {
Cursor callsCursor = getCallsCursor();
Cursor smsCursor = getSmsCursor();
if (callsCursor == null || smsCursor == null) {
return;
}
SortedSet<MActivity> activities = new TreeSet<>((left, right) -> left.getReceiveTime() < right.getReceiveTime() ? 1 : 0);
Flowable.concat(getCallsPublisher(callsCursor), getSmsPublisher(smsCursor))
.subscribeOn(Schedulers.io())
.doOnNext(new Consumer<MActivity>() {
@Override
public void accept(@NonNull MActivity mActivity) throws Exception {
if (mActivity != null){
activities.add(mActivity);
}
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Timber.d("doOnComplete");
callsCursor.close();
smsCursor.close();
mBus.post(new EActivities(activities)); //<--- here I'm getting ~25 items
}
})
.subscribe();
}
private Publisher<MActivity> getCallsPublisher(Cursor callsCursor) {
return Flowable.fromIterable(RxCursorIterable.from(callsCursor))
.take(callsCursor.getCount() < LIMIT ? callsCursor.getCount() : LIMIT)
.subscribeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map((Function<Cursor, MActivity>) this::readCallFromCursor)
.sequential()
.observeOn(AndroidSchedulers.mainThread());
}
private Publisher<MActivity> getSmsPublisher(Cursor smsCursor) {
return Flowable.fromIterable(RxCursorIterable.from(smsCursor))
.take(smsCursor.getCount() < LIMIT ? smsCursor.getCount() : LIMIT)
.subscribeOn(Schedulers.computation())
.parallel()
.runOn(Schedulers.computation())
.map((Function<Cursor, MActivity>) this::readSmsFromCursor)
.sequential()
.observeOn(AndroidSchedulers.mainThread());
}
sms를 읽는 방법과 통화가 정상적으로 작동합니다. 나는 문제가 다른 곳에 있다고 믿는다.
또한 나는 문제가TreeSet
을 만들 때 사용하는 비교에있다 생각
cursor
행위
public class RxCursorIterable implements Iterable<Cursor> {
private Cursor mIterableCursor;
public RxCursorIterable(Cursor c) {
mIterableCursor = c;
}
public static RxCursorIterable from(Cursor c) {
return new RxCursorIterable(c);
}
@Override
public Iterator<Cursor> iterator() {
return RxCursorIterator.from(mIterableCursor);
}
private static class RxCursorIterator implements Iterator<Cursor> {
private final Cursor mCursor;
public RxCursorIterator(Cursor cursor) {
mCursor = cursor;
}
public static Iterator<Cursor> from(Cursor cursor) {
return new RxCursorIterator(cursor);
}
@Override
public boolean hasNext() {
return !mCursor.isClosed() && mCursor.moveToNext();
}
@Override
public Cursor next() {
return mCursor;
}
}
}
답변 해 주셔서 감사합니다. 당신이 사용하고있는 함수를 이해하고 싶기 때문에 lambda없이이 스 니펫을 작성해 주시겠습니까? – AnZ
@AnZ이 함수를 사용하고 있습니다 :'generate (initialState, generator, disposeState)'http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#generate(java.util.concurrent) .Callable, % 20io.reactivex.functions.BiConsumer, % 20io.reactivex.functions.Consumer) –