kafka의 데이터를 null 형식의 형식으로 수신합니다.평균 계산을위한 스파크 스트리밍
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
null,val1,val2,val3,val4,val5,val6,val7,...val23
이제 다음 코드를 사용하여 null 키를 제거하고 새 키와 값 쌍을 형성하는 값을 매핑했습니다. 발스 (2) 캐릭터 키, 나머지 22 개 값은 값이됩니다 것
val topics = Array("kafka-topic")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
streamingContext.checkpoint("hdfs:///hdfs/location")
val record= stream.map(record=>record.value().toString)
val rdds=record.transform
{
pps=>pps.flatMap(_.split(","))
}
val ppds= rdds.transform
` `{
pair=>pair.map(vals=>
(vals(2).toString(),Set(vals(1).toLong,vals(2),vals(3),vals(4),val(5),val(6),val(7)....val(23)
}
.
이제 20 초의 시간 창에서 키당 모든 값의 평균을 얻고 키당 계산 된 평균을 데이터 저장소 (HBASE)로 계속 푸시하려고합니다. . 일괄 모드에서 나는 당신이 이것을 할 수있는 aggregatebykey() 메소드가 있음을 이해한다.
스트리밍 모드에서 어떻게 달성 할 수 있습니까?
일부 값은 문자열 일 가능성이 있습니다. 문자열 인 값을 건너 뛰고 숫자 형식의 평균 만 계산하는 동안 어떻게 HBASE로 업데이트를 계속 푸시합니까?
스트리밍 용으로 아니었지만 비슷한 질문에 답해주었습니다. 다음 URL에서 귀하와 다른 사람들을 도울 수 있습니다. https://stackoverflow.com/questions/29930110/calculating- pairwise-kv-rdd-in-pyth와 함께 각각의 평균에 대한 평균값 –