Spark Streaming + Accumulo 커넥터와 전체 사용 예제를 찾고 있습니다.스파크 스트리밍 + Accumulo - 일련 화 BatchWriterImpl
현재 Accumulo 테이블에 Spark Streaming 결과를 쓰려고하는데 BatchWriter에 대해 NotSerializableException이 발생합니다. 누군가 BatchWriter를 직렬화하는 방법에 대한 예제를 가르쳐 주시겠습니까? 아래 코드는 Accumulo 문서를 기반으로합니다.
현재 코드 : 런타임 오류 동안
val accumuloInstanceName = "accumulo"
val zooKeepers = "localhost:2181"
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers)
val accumuloUser = programOptions.accumuloUser()
val accumuloPassword = programOptions.accumuloPassword()
val passwordToken = new PasswordToken(accumuloPassword)
val connector = instance.getConnector(accumuloUser, passwordToken)
val accumuloBatchWriterConfig = new BatchWriterConfig
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory)
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig)
fullMergeResultFlatten.foreachRDD(recordRDD =>
recordRDD.foreach(record => {
val mutation = new Mutation(Longs.toByteArray(record.timestamp))
mutation.put("value", "", new Value(Longs.toByteArray(record.value)))
mutation.put("length", "", new Value(Longs.toByteArray(record.length)))
accumuloBatchWriter.addMutation(mutation)
})
)
가 발생
17/05/05 16:55:25 ERROR util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
나는 이것이 매우 일반적인 경우입니다 생각하지만 난 어떤 간단한 스파크 스트리밍 + accumulo 예를 찾을 수 없습니다.