2017-01-07 8 views
1

내 Android 앱에서 Socket을 사용하여 바이트 배열을 송수신해야합니다. 편의상 나는 Socket에 연결된 Observable으로 작업하고 싶습니다. 인터넷에서 찾고배열을 관찰 할 수 있도록 TCP 소켓 켜기 [Byte]

나는이 코드를 발견했습니다는 "안녕하세요"문자열을 보낼 때, 예를 들어, 한 번에

import rx.lang.scala.Observable 

val s = Observable.using[Char,Socket](new Socket("10.0.2.2", 9002))(
    socket => Observable.from[Char](Source.fromInputStream(socket.getInputStream).toIterable), 
    socket => Try(socket.close)) 
    .subscribeOn(rx.lang.scala.schedulers.IOScheduler.apply) 

    val a = s.subscribe(println, println) 
그것은 작동

하지만 출력 한 문자를 출력은 다음과 같습니다

I/System.out: h 
I/System.out: e 
I/System.out: l 
I/System.out: l 
I/System.out: o 
I/System.out: 
I/System.out: t 
I/System.out: h 
I/System.out: e 
I/System.out: r 
I/System.out: e 

그러나 구독에 버퍼링 된 바이트 배열을 수신하려고합니다. 어떻게하면 될까요?

+1

을 당신이'바이트 []'을 방출 할 않는 경우? '\ n '이나'\ r \ n'문자가 수신 된 후 또는 소켓이 닫힌 후에 'N'문자가 수신 된 후? (또는 완전히 다른 무엇인가?) –

+0

@SeanVieira 너무 자주 (각각의받은 바이트에서) 방출하기를 원하지 않는다. 일단 채워지면 방출 될 수있는 고정 크기의 청크를 생각하고 있었다. – Anton

+0

@SeanVieira 또는 전송 된 전체 메시지가 단일 바이트 배열로 출력 될 수 있습니다 (일반 TCP 소켓을 사용할 때 전체 메시지의 websocket 스타일 분리를 수행 할 수 있는지 확실하지 않음) . – Anton

답변

1

@SeanVieira가 이미 말했듯이 먼저 스트림 요소 인 문자를 집계하는 방법을 결정해야합니다. 각 메시지가 끝나면 스트림이 닫히는 것을 알고 있으면 전체 메시지를 수신 할 때까지 기다린 다음 onCompleted()에서 시퀀스를 내보낼 수 있습니다.

저는 여러분이 지금까지 구현 한 것이 아주 훌륭하다고 생각합니다. 그것은 관찰자가 무엇을 어떻게 처리해야하는지에 달려 있기 때문입니다.

필요에 따라 스트림 변환을 추가 할 수 있습니다. 지. 그것은이 SO answer

tumblingBuffer를 사용하는 솔루션에서 수행 될 때

  • buffer, 특히, tumblingBuffer(boundary) 봐) 버퍼와
  • debounce이 당신의 이미 만들어진 다음과 같이 볼 수 있었다 관찰 가능한 (테스트되지 않음) :

    source.tumblingBuffer(source.filter(_ == '\n')) 
    

    경계 관찰 가능 source.filter(...)이 요소를 방출하면 전체 버퍼를 방출합니다. 그럼 당신은 그 피 감시에 mkString를 사용하여 문자열에있는 문자의 순서를 변환하고 구독 할 수 있습니다 :

    source.tumblingBuffer(source.filter(_ == '\n')).map(mkString(_))