저는 RxJava와 반응 형 프로그래밍이 완전히 새롭습니다. 파일을 읽고 Observable에 저장해야하는 과제가 있습니다. 나는 내부에 BufferedReader를 사용하여 Callable을 만들고 Observable.fromCallable()을 사용하려고 시도했지만 많이 작동하지 않았습니다.RxJava. 관측 가능 파일 읽기
내가 어떻게 할 수 있습니까?
RxJava 2.0을 사용하고 있습니다.
저는 RxJava와 반응 형 프로그래밍이 완전히 새롭습니다. 파일을 읽고 Observable에 저장해야하는 과제가 있습니다. 나는 내부에 BufferedReader를 사용하여 Callable을 만들고 Observable.fromCallable()을 사용하려고 시도했지만 많이 작동하지 않았습니다.RxJava. 관측 가능 파일 읽기
내가 어떻게 할 수 있습니까?
RxJava 2.0을 사용하고 있습니다.
this implementation과 유사한 기능을 수행 할 수 있습니다. 이 방법으로
그리고 그 클래스를 사용하는 here :
public static Observable<byte[]> from(InputStream is, int size) {
return Observable.create(new OnSubscribeInputStream(is, size));
}
은 결국 당신은 그것을 사용할 수 있습니다
Observable<byte[]> chunks = Bytes.from(file, chunkSize);
자세한 내용 here합니다.
는나는 데이터를 생성 한 다음 관찰자 때까지 관찰의 생성을 연기 중첩 클래스 FileObservableSource
를 사용하여 기본 솔루션은 구독 :
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class StackOverflow {
public static void main(String[] args) {
final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml"));
observable.subscribe(
line -> System.out.println("next line: " + line),
Throwable::printStackTrace,
() -> System.out.println("finished")
);
}
static class FileObservableSource implements ObservableSource<String> {
private final String filename;
FileObservableSource(String filename) {
this.filename = filename;
}
@Override
public void subscribe(Observer<? super String> observer) {
try {
Files.lines(Paths.get(filename)).forEach(observer::onNext);
observer.onComplete();
} catch (IOException e) {
observer.onError(e);
}
}
}
}
이것이 배압 하에서 어떻게 작동합니까? 구독자가 읽기보다 느린 각 행을 처리하는 다른 스레드에서 관찰하고 있다고 가정 해 봅시다. – 0cd
그것은 나쁘게 행동 할 것입니다. 배압은 여기에서 처리되지 않습니다. 나는 반응 형 스트림 구현을 사용 하겠지만이 질문은 기본적인 예에 관한 것입니다. –
하지만 난 chunksize 영역을 몰라 및 파일 라인을 읽을 필요가있는 경우 줄 단위로? –
그러면 코드를 수정하여'InputStream'을'ReadLine' 메소드를 사용하여'BufferedReader'로 변환해야합니다. – GVillani82