2015-01-05 2 views
0

그것은 MQTTUtils 만 세 가지 방법, 데프 createStream 제공하는 것 (JSSC : JavaStreamingContext, brokerUrl : 문자열, 주제 : 문자열, storageLevel : StorageLevel) : JavaDStream [문자열]스파크

MQTT 공개 자에 의해 푸시 된 메시지를 수신하는 입력 스트림을 작성하십시오. def createStream (jssc : JavaStreamingContext, brokerUrl : String, topic : String) : JavaDStream [문자열]

MQTT 게시자가 푸시 한 메시지를받는 입력 스트림을 만듭니다. 데프 createStream (SSC : StreamingContext, brokerUrl : 문자열 주제 : 문자열 storageLevel : StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)은 : d 스트림 [문자열]

는 MQTT 출판사에 의해 푸시 메시지를 수신하는 입력 스트림을 만든다.

하지만 브로커에서 인증을 사용하는 경우 어떻게 사용자 이름과 비밀번호를 제공 할 수 있습니까?

답변

0

당신은 URL에 사용자 이름과 암호를 포함하여 시도 할 수 :

MQTT를 : // 사용자 이름 : 암호 @ 호스트 : 포트

+0

자격증 명을 사용하여 url에 mqtt 및 tcp 스키마를 사용해 보았습니다. mqtt 스키마는'java.lang.IllegalArgumentException'을 제공하고 tcp 스키마는'java.net.UnknownHostException'을 제공합니다. – ggalmazor

0

MQTT Scala Word Count Example을 찾아주세요.

는 특히이 사건이 당신의 ActiveMQ 브로커를 시작되었는지 확인하기 전에

bin/run-example org.apache.spark.examples.streaming.MQTTWordCount mqtt://username:[email protected]:port foo 

bin/run-example org.apache.spark.examples.streaming.MQTTPublisher mqtt://username:[email protected]:port foo 

가입자로 게시자를 실행합니다.

예제 코드 현재 사용할 수있는 사용자 정의 스파크 스트리밍 MQTT 커넥터 라이브러리를 사용하여 시도 할 수

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-core</artifactId> 
    <version>5.7.0</version> 
</dependency> 
0

import org.apache.activemq.broker.{TransportConnector, BrokerService} 
. 
. 
. 
. 
def startActiveMQMQTTBroker() { 
    broker = new BrokerService() 
    broker.setDataDirectoryFile(Utils.createTempDir()) 
    connector = new TransportConnector() 
    connector.setName("mqtt") 
    connector.setUri(new URI("mqtt:" + brokerUri)) 
    broker.addConnector(connector) 
    broker.start() 
} 

치어 파일 - https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0합니다.

이 라이브러리는 통신이 항상 고정

  • 추가 TLS 1.2 보안 있도록 원래 라이브러리의 상단에 다음을 추가합니다.
  • RDD의 페이로드와 함께 저장된 주제.

그래서, 당신은뿐만 아니라 RDD 스토리지 레벨을 통과 할 수 있습니다

val lines = MQTTUtils.createStream(ssc, // Spark Streaming Context 
      "ssl://URL",    // Broker URL 
      "<topic>",     // MQTT topic 
      "MQTT client-ID",   // Unique ID of the application 
      "Username", 
      "passowrd") 

가 과부하 생성자를 스트림을 만들려면 다음 방법을 사용합니다. 희망이 도움이됩니다.