Zeppelin을 사용하여 SnappyData 스트리밍 테이블을 만들려고합니다. 나는 몇 단락으로 나누어 져 'rowConverter'SnappyData + Zeppelin + Kafka 스트리밍 - 스트리밍 테이블을 만드는 중 오류가 발생했습니다.
제플린 노트북 인수에 스트림 테이블 정의와 문제가 :
조 제 1 항 :
import org.apache.spark.sql.Row
import org.apache.spark.sql.streaming.{SchemaDStream, StreamToRowsConverter}
class RowsConverter extends StreamToRowsConverter with Serializable {
override def toRows(message: Any): Seq[Row] = {
val log = message.asInstanceOf[String]
val fields = log.split(",")
val rows = Seq(Row.fromSeq(Seq(new java.sql.Timestamp(fields(0).toLong),
fields(1),
fields(2),
fields(3),
fields(4),
fields(5).toDouble,
fields(6)
)))
rows
}
}
제 2 항 :
snsc.sql(
"CREATE STREAM TABLE adImpressionStream if not exists ("sensor_id string, metric
metric string) using kafka_stream
options (storagelevel 'MEMORY_AND_DISK_SER_2',
rowConverter 'RowsConverter',
zkQuorum 'localhost:2181',
groupId 'streamConsumer', topics 'test'");"
)
첫 번째 단락에서 오류를 반환합니다.
(210)두 번째 문단 :
java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: RowsConverter
내가 자식에서 기본 코드를 사용하는 것을 시도하고있다 :
snsc.sql("create stream table streamTable (userId string, clickStreamLog string) " +
"using kafka_stream options (" +
"storagelevel 'MEMORY_AND_DISK_SER_2', " +
" rowConverter 'io.snappydata.app.streaming.KafkaStreamToRowsConverter' ," +
"kafkaParams 'zookeeper.connect->localhost:2181;auto.offset.reset->smallest;group.id->myGroupId', " +
"topics 'test')")
하지만 비슷한 오류가 있습니다
java.lang.RuntimeException: Failed to load class : java.lang.ClassNotFoundException: io.snappydata.app.streaming.KafkaStreamToRowsConverter
당신이 좀 도와 수 이 문제? 정말 고마워요.
작동 원리 : D https://github.com/SnappyDataInc/snappy-poc#lets-get-this-going의 단계별 조치를 취했습니다. 지금은 제플린이 없으면 작동하는지 확인하십시오. 스트리밍 작업 - 테이블에서 데이터를 선택할 수는 있지만 Spark에는 응용 프로그램이 표시되지 않습니다. 또한 4040 포트의 WebUI는 나를 사용할 수 없습니다. – Tomtom
포트 5050을 사용해 볼 수 있습니까? –