2017-12-11 23 views
0

kafka의 데이터를 null 형식의 형식으로 수신합니다.평균 계산을위한 스파크 스트리밍

null,val1,val2,val3,val4,val5,val6,val7,...val23 
null,val1,val2,val3,val4,val5,val6,val7,...val23 
null,val1,val2,val3,val4,val5,val6,val7,...val23 

이제 다음 코드를 사용하여 null 키를 제거하고 새 키와 값 쌍을 형성하는 값을 매핑했습니다. 발스 (2) 캐릭터 키, 나머지 22 개 값은 값이됩니다 것

val topics = Array("kafka-topic") 
    val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
    ) 
    streamingContext.checkpoint("hdfs:///hdfs/location") 
    val record= stream.map(record=>record.value().toString) 


    val rdds=record.transform 
    { 
    pps=>pps.flatMap(_.split(",")) 
    } 

    val ppds= rdds.transform 
` `{ 
    pair=>pair.map(vals=> 
(vals(2).toString(),Set(vals(1).toLong,vals(2),vals(3),vals(4),val(5),val(6),val(7)....val(23) 
} 

.

이제 20 초의 시간 창에서 키당 모든 값의 평균을 얻고 키당 계산 된 평균을 데이터 저장소 (HBASE)로 계속 푸시하려고합니다. . 일괄 모드에서 나는 당신이 이것을 할 수있는 aggregatebykey() 메소드가 있음을 이해한다.

스트리밍 모드에서 어떻게 달성 할 수 있습니까?

일부 값은 문자열 일 가능성이 있습니다. 문자열 인 값을 건너 뛰고 숫자 형식의 평균 만 계산하는 동안 어떻게 HBASE로 업데이트를 계속 푸시합니까?

+0

스트리밍 용으로 아니었지만 비슷한 질문에 답해주었습니다. 다음 URL에서 귀하와 다른 사람들을 도울 수 있습니다. https://stackoverflow.com/questions/29930110/calculating- pairwise-kv-rdd-in-pyth와 함께 각각의 평균에 대한 평균값 –

답변

0

사용 reduceByKeyAndWindow, 예를 들어 위

// Reduce last 30 seconds of data, every 10 seconds 

val aggregateFunction = (a:Int,b:Int) => (a + b) 
val pairDStream = // DStream contains (word,1) 
val windowedWordCounts = pairDStream.reduceByKeyAndWindow(aggregateFunction, Seconds(30), Seconds(10)) 

당신은 더 복잡한 집계 함수를 작성하는 대신 위와 같이 간단한 추가 기능을 사용하는 윈도우 기간 동안 단어 수를 계산할 수 있습니다 사용됩니다 reduceByKeyAndWindow와 함께 사용하십시오.

자세한 내용은
https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/10%20Window%20Aggregations.html