2017-11-05 7 views
0

RxJava2을 사용하여 올바르게이 작업을 구현하는 방법을 결정할 수 없습니다.RxJava를 사용하여 여러 구독자 및 데이터 흐름을위한 전략 2

문제는 다음과 같습니다. AuidoRecord을 사용하여 오디오를 녹음합니다. 당신은 내가 수동으로 목록에 가입자를 추가하고 볼 수 있듯이

현재 그

private class StreamAudioRecordRunnable extends Flowable<short[]> implements Runnable { 
    private int mShortBufferSize; 
    private List<Subscriber<? super short[]>> mSubscribers = new ArrayList<>(); 
    private short[] mAudioShortBuffer; 

    private void removeAllNullableSubscribers() { 
     mSubscribers.removeAll(Collections.singleton(null)); 
    } 

    private void notifyAllSubscribers(short[] audioBuffer) { 
     removeAllNullableSubscribers(); 
     for (Subscriber<? super short[]> subscriber : mSubscribers) { 
      subscriber.onNext(audioBuffer); 
     } 
    } 

    @Override 
    protected void subscribeActual(Subscriber<? super short[]> newSubscriber) { 
     mSubscribers.add(newSubscriber); 
    } 

    private void notifyAllSubscribersAboutError(Throwable error) { 
     for (Subscriber<? super short[]> subscriber : mSubscribers) { 
      subscriber.onError(error); 
     } 
    } 

    @Override 
    public void run() { 
      // Init stuff 
      while (mIsRecording.get()) { 
       int ret; 
       ret = mAudioRecord.read(mAudioShortBuffer, 0, mShortBufferSize);        
       notifyAllSubscribers(mAudioShortBuffer); 
      } 
     mAudioRecord.release(); 
    } 
} 

같은 사용자 정의 Flowable 클래스를 구현했습니다. 그런 다음 새 버퍼를 얻으면 모든 구독자에게 알림이 전송됩니다.

나는 이것을 수행하는 데 가장 좋은 방법이 아니라고 생각합니다.

내가

  1. 지금까지와 같은 서비스에서이 유동성 실행 필요한 것. 구독자가없는 경우에도 서비스가 실행될 때까지 실행되어야합니다.
  2. 구독자가 일정하지 않아 구독하고 구독 취소 할 수 있지만 Flowable/Observable을 계속 실행해야합니다.
  3. Flowable에서 방출 한 데이터가 스트림이므로 구독자에게 이미 방출 된 항목에 대한 알림을받지 않아야하며 현재 스트리밍 데이터 만 가져와야합니다. 화재와 잊기.
  4. Flowable은 모든 구독자가 없어져도 실행되어야합니다.

구현하려면 올바른 전략을 제안하십시오. 도움을 주시면 감사하겠습니다.

+0

Runnable을 사용하는 이유가 있습니까? – JohnWowUs

+0

@JohnWowUs, 아니 내 사소한 실수 야, RxJava가 실행 스레드를 지정할 수있는 한 빨리 제거 할 것입니다. – bxfvgekd

+0

@JohnWowUs 구현 자체에 대해 무엇을 말할 수 있습니까? – bxfvgekd

답변

0

뭔가

public class StreamAudioRecordRunnable { 
    private int mShortBufferSize; 
    private short[] mAudioShortBuffer; 
    private ConnectedFlowable<short[]> audioFlowable(); 

    public StreamAudioRecordRunnable() { 
     audioFlowable = Flowable.create(new ObservableOnSubscribe<short[]>() { 
      @Override 
      public void subscribe(FlowableEmitter<short[]> emitter) throws Exception { 
       try { 
        while (mIsRecording.get()) { 
         int ret; 
         ret = mAudioRecord.read(mAudioShortBuffer, 0, mShortBufferSize);        
         emitter.onNext(mAudioShortBuffer); 
        } 
        emitter.onComplete(); 
        mAudioRecord.release(); 
       } catch (Exception e) { 
        emitter.onError(e); 
        mAudioRecord.release(); 
       } 
      } 
     }).subscribeOn(Schedulers.io()).publish(); 
    } 


    public Flowable<short[]> getFlowable() { 
      return audioFlowable.hide(); 
    } 

    @Override 
    public void start() { 
     audioObservable.connect();  
    } 
} 

처럼 내 취향이 될 것입니다.

+0

답변을 주셔서 감사합니다, 당신은이 접근법을 사용하여 찬성을 설명 할 수 있습니까? – bxfvgekd

+0

가능한 경우 상속보다는 컴포지션을 사용해야합니다. 가입자를 관리 할 필요가 없습니다. – JohnWowUs