2017-05-08 15 views
2

Spark Streaming + Accumulo 커넥터와 전체 사용 예제를 찾고 있습니다.스파크 스트리밍 + Accumulo - 일련 화 BatchWriterImpl

현재 Accumulo 테이블에 Spark Streaming 결과를 쓰려고하는데 BatchWriter에 대해 NotSerializableException이 발생합니다. 누군가 BatchWriter를 직렬화하는 방법에 대한 예제를 가르쳐 주시겠습니까? 아래 코드는 Accumulo 문서를 기반으로합니다.

현재 코드 : 런타임 오류 동안

val accumuloInstanceName = "accumulo" 
val zooKeepers = "localhost:2181" 
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers) 
val accumuloUser = programOptions.accumuloUser() 
val accumuloPassword = programOptions.accumuloPassword() 
val passwordToken = new PasswordToken(accumuloPassword) 
val connector = instance.getConnector(accumuloUser, passwordToken) 

val accumuloBatchWriterConfig = new BatchWriterConfig 
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024 
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory) 
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
fullMergeResultFlatten.foreachRDD(recordRDD => 
    recordRDD.foreach(record => { 
    val mutation = new Mutation(Longs.toByteArray(record.timestamp)) 
    mutation.put("value", "", new Value(Longs.toByteArray(record.value))) 
    mutation.put("length", "", new Value(Longs.toByteArray(record.length))) 
    accumuloBatchWriter.addMutation(mutation) 
    }) 
) 

가 발생

17/05/05 16:55:25 ERROR util.Utils: Exception encountered 
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 

나는 이것이 매우 일반적인 경우입니다 생각하지만 난 어떤 간단한 스파크 스트리밍 + accumulo 예를 찾을 수 없습니다.

답변

0

elserj가 지적했듯이 연결 개체를 serialize하는 것은 일반적으로 올바른 패턴이 아닙니다. 내가 본 패턴은 RDD.foreachPartition()을 사용하여 Spark 작업자 노드에서 직접 연결을 시작하는 것입니다. 이는 거의 불가능한 각 개별 레코드에 대해 새로운 연결을 만드는 것과 달리 작업 일괄 처리마다 연결을 생성 할 수 있기 때문에 좋습니다.

예 :

fullMergeResultFlatten.foreachRDD(recordRDD => { 
    recordRDD.foreachPartition(partitionRecords => { 
    // this connection logic is executed in the Spark workers 
    val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
    partitionRecords.foreach(// save operation) 
    accumuloBatchWriter.close() 
    }) 
}) 
0

BatchWriter 클래스를 직렬화 할 수 없습니다. 귀하의 코드를 수정하는 방법에 대한 제안이 없지만, 그 클래스를 직렬화하려고 시도하는 것이 적절한 방법이 아니라고 말할 수 있습니다.