2016-06-01 6 views
2

스파크 스트리밍을 사용하여 특정 간격으로 제로 MQ 대기열에서 데이터를 수신하고이를 풍부하게하여 여기 엔 나무 마루 파일로 저장합니다. 하나의 스트리밍 창에서 다른 스트리밍 창으로 데이터를 비교하고 싶습니다. (나중에 마루 파일을 사용하여 시간을 비교합니다.)spark에 대한 스트리밍 창 타임 스탬프 가져 오기

타임 스탬프에서 특정 스트리밍 창을 어떻게 찾을 수 있습니까? 간단히 말해서

JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(duration)); 
inputStream = javaStreamingContext.receiverStream(new StreamReceiver(hostName, port, StorageLevel.MEMORY_AND_DISK_SER())); 
JavaDStream<myPojoFormat> enrichedData = inputStream.map(new Enricher()); 

나는 각 스트리밍 윈도우의 타임 스탬프를합니다.

답변

2

당신은 Function2의 매개 변수를 가져 JavaDStreamtransform 방법을 사용할 수 있습니다 (레벨하지만 배치 수준을 기록하지 않음). Function2RDDTime 개체를 가져오고 새 RDD를 반환합니다. 전체 결과는 새로 입력 한 JavaDStream이되며 선택한 논리에 따라 RDD이 trasformed되었습니다.

+0

마크, 신속한 답변을 보내 주셔서 감사합니다. 나는 이미 inputdata에 대한지도를 만들고 있는데, 나는 내 데이터의 각 행에 "일괄 처리 타임 스탬프"를 추가하고 싶다. 나는 두려워, 나는 행 레벨에 추가 할 필요가 있기 때문에 변환을 통해 가능하지 않다고 생각한다. 내가 틀렸다면 나를 바로 잡으십시오. – Count

+0

물론 변형 방법을 통해 각 행에 타임 스탬프를 추가 할 수 있습니다. 내부 변환을하면 각 rdd의 각 줄마다 타임 스탬프를 추가하는 rdd.map 메서드가 있습니다. rdd 당 하나의 타임 스탬프가 있습니다.이 rdd의 생성 타임 스탬프는 ... – mgaido

+0

감사합니다. 그것도 (하지만 늦게), 내가 제안한 변환 내지도를 옮겼습니다, 잘 작동하는 것 같습니다. – Count