2017-11-29 15 views
1

현재 Bluetooth 장치에 대한 프로토콜을 구현 중이며 RxAndroidBle 라이브러리 (버전 1.4.3)를 사용 중입니다.첫 번째 방사에 의한 특성 표시 방출 횟수 결정

특성에 쓰고 특성 알림을 통해 응답을 듣는 방식으로 장치에서 데이터를 요청해야합니다.

내가의 코드를 사용하고 (쓰기, 듣기) 2 개 작업을 결합하려면 :이 방법은 나를 위해 작동 https://stackoverflow.com/a/41140523/734385

connectionObservable 
     .flatMap(// when the connection is available... 
       rxBleConnection -> rxBleConnection.setupNotification(AP_SCAN_DATA), // ... setup the notification... 
       (rxBleConnection, apScanDataNotificationObservable) -> Observable.combineLatest(// ... when the notification is setup... 
         rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue), // ... write the characteristic... 
         apScanDataNotificationObservable.first(), // ... and observe for the first notification on the AP_SCAN_DATA 
         (writtenBytes, responseBytes) -> responseBytes // ... when both will appear return just the response bytes... 
       ) 
     ) 
     .flatMap(observable -> observable) 

, 유일한 문제는 (코드 첫 번째 20 바이트 나에게 준다이다 apScanDataNotificationObservable.first()으로 인해).

불행히도, 나는 받고있는 패키지의 크기를 알지 못합니다. 나는 처음 20 바이트의 머리말에서만 정보를 추출 할 수있다. 버퍼 기능이 모두 미리 크기를 알아야하는 것처럼 보입니다.

위의 코드를 Rx 체인의 일부로 사용하여이 작업을 정상적으로 수행 할 수있는 방법이 있습니까?

즉, Rx 체인의 첫 방출을 기준으로 방출 수를 제어 할 수 있습니까?

아니면 완전히 잘못된 접근이 있습니까?

답변

0

원하는 것을 얻을 수 있습니다.

가장 쉬운 방법은 교환 할 수있을 것 Observable.combineLatest(...)에 :

newResponseEndWatcher()는 수신 된 값이 모두가 예상되는 경우 결정하는 로직을 포함해야
Observable.merge(
     rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue).ignoreElements(), // send the request but ignore the returned value 
     apScanDataNotificationObservable.takeUntil(newResponseEndWatcher()) // take the response notifications until the response end watcher says so 
); 

. 그것은 다음과 같이 수 :

private Func1<byte[], Boolean> newResponseEndWatcher() { 
    return new Func1<byte[], Boolean>() { 

     private static final int NOT_INITIALIZED = -1; 

     private int totalLength = NOT_INITIALIZED; 
     private int receivedLength = NOT_INITIALIZED; 

     @Override 
     public Boolean call(byte[] bytes) { 
      if (isNotInitialized(totalLength)) { // if it is the first received value 
       // parse totalLength from the header 
      } 
      // update receivedLength 
      return receivedLength >= totalLength; 
     } 

     private boolean isNotInitialized(int value) { 
      return value == NOT_INITIALIZED; 
     } 
    }; 
} 

그냥 생각하고있는 결과 newResponseEndWatcher()입니다 Func1은 상태입니다. apScanDataNotificationObservable.takeUntil(newResponseEndWatcher())의 결과 인 관찰 가능을 변수에 저장하는 경우 다음 구독은 조기에 종료 될 수 있습니다.

는이 등록 될 때마다 newResponseEndWatcher()를 호출 한 다음 apScanDataNotificationObservable.takeUntil(newResponseEndWatcher) 새로운 만들 것 하나 Observable.using() 기능을 사용할 수 있습니다이 문제를 완화하기 위해 : 빠른 응답을위한

Observable.using(
     () -> newResponseEndWatcher(), // create a new response end watcher on each subscription 
     responseEndWatcher -> apScanDataNotificationObservable.takeUntil(responseEndWatcher), // create the response observable that will complete properly 
     responseEndWatcher -> { /* ignored, responseEndWatcher will get GCed eventually */ } 
); 
+0

덕분에, 내가 코드를 시도하고 내가 무엇입니까 전체 패키지,하지만 모든 방출을 수집 toList() 사용하려고했지만 내가 생각하기 때문에 그 이유는 takeUntil 포함되지 않습니다이며 그것은 마지막 방출 (따라서 또한 toList()) 그것이 유일한 방출 일 것 인 것에 따라 나는 원래의 코드를 사용하고, 방출을 모으기 위해 flatmap를 사용하고, 항상 Observable.empty를 돌려 주어서 그것을 위해 지금 해결했다() 마지막 배출까지 – tiqz

+0

'Observable.takeUntil (Func1)'Javadoc :'이 연산자와이 함수의 차이점은 여기에 있습니다. 항목이 방출 된 후 조건이 평가됩니다. 그래서 나는 그것이 사실이라고 생각하지 않는다? 어쩌면 newResponseEndWatcher() 구현에 off-by-one 실수가있을 수 있습니까? –

+0

@tiqz 더 자세한 정보가 필요하십니까? '.toList()'는 완료된'Observable'과'newResponseEndWatcher()'의 구현이 맞으면'apScanDataNotificationObservable.takeUntil (newResponseEndWatcher())'문제없이 완료해야합니다. –