2017-04-16 6 views
2

저는 RxJava와 반응 형 프로그래밍이 완전히 새롭습니다. 파일을 읽고 Observable에 저장해야하는 과제가 있습니다. 나는 내부에 BufferedReader를 사용하여 Callable을 만들고 Observable.fromCallable()을 사용하려고 시도했지만 많이 작동하지 않았습니다.RxJava. 관측 가능 파일 읽기

내가 어떻게 할 수 있습니까?

RxJava 2.0을 사용하고 있습니다.

답변

0

this implementation과 유사한 기능을 수행 할 수 있습니다. 이 방법으로

그리고 그 클래스를 사용하는 here :

public static Observable<byte[]> from(InputStream is, int size) { 
    return Observable.create(new OnSubscribeInputStream(is, size)); 
} 

은 결국 당신은 그것을 사용할 수 있습니다

Observable<byte[]> chunks = Bytes.from(file, chunkSize); 

자세한 내용 here합니다.

+0

하지만 난 chunksize 영역을 몰라 및 파일 라인을 읽을 필요가있는 경우 줄 단위로? –

+1

그러면 코드를 수정하여'InputStream'을'ReadLine' 메소드를 사용하여'BufferedReader'로 변환해야합니다. – GVillani82

2

나는 데이터를 생성 한 다음 관찰자 때까지 관찰의 생성을 연기 중첩 클래스 FileObservableSource를 사용하여 기본 솔루션은 구독 :

import io.reactivex.Observable; 
import io.reactivex.ObservableSource; 
import io.reactivex.Observer; 

import java.io.IOException; 
import java.nio.file.Files; 
import java.nio.file.Paths; 

public class StackOverflow { 

    public static void main(String[] args) { 
     final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml")); 
     observable.subscribe(
       line -> System.out.println("next line: " + line), 
         Throwable::printStackTrace, 
         () -> System.out.println("finished") 
       ); 
    } 

    static class FileObservableSource implements ObservableSource<String> { 

     private final String filename; 

     FileObservableSource(String filename) { 
      this.filename = filename; 
     } 

     @Override 
     public void subscribe(Observer<? super String> observer) { 
      try { 
       Files.lines(Paths.get(filename)).forEach(observer::onNext); 
       observer.onComplete(); 
      } catch (IOException e) { 
       observer.onError(e); 
      } 
     } 
    } 
} 
+0

이것이 배압 하에서 어떻게 작동합니까? 구독자가 읽기보다 느린 각 행을 처리하는 다른 스레드에서 관찰하고 있다고 가정 해 봅시다. – 0cd

+0

그것은 나쁘게 행동 할 것입니다. 배압은 여기에서 처리되지 않습니다. 나는 반응 형 스트림 구현을 사용 하겠지만이 질문은 기본적인 예에 ​​관한 것입니다. –