2017-12-15 13 views
0

하자 내가 2 개 스트림이 있다고 출력에 TimeWindow에 지정된 시간 범위 내에있는 숫자의 합계를 포함하는 TimeWindow 객체가 포함되도록 스트림을 조인할까요?
구체적으로 말해서 기간 저장소의 위치는 XXX입니다. win.getDuration() 여기서 이 ValueJoiner에서 참조되는 것입니다.JoinWindows 기간이 스트림 중 하나의 객체에 저장된 2 개의 Kafka KStream을 결합 할 수 있습니까?</li> </ol> <p>사용자 DSL의 API 중 하나 또는 프로세스 API에 그것이 가능 (시간을 시작으로, 종료 시간) (타임 스탬프)</li> <li>번호</p> <ol> <li>시간 창 :

timeWindow.join( 
    numbers, 
    (ValueJoiner<TimeWindow, Number, TimeWindow>) (win, num) -> win.addToTotal(num), 
    new JoinWindows(XXX, 0) 
).to("output_Topic"); 

이후의 JoinWindows는 TimeWindow의 타임 스탬프가 endtime이기 때문에 0입니다. XXX 기간은 TimeWindows 종료 시간 (밀리 초 단위의 시작 시간)으로 계산되어야합니다.

많은 도움을 주셔서 감사합니다.

+0

DSL을 사용할 수 없지만 프로세서 API를 사용하면 무엇이든 만들 수 있습니다. 그러나 SO가 제공 할 수있는 범위를 벗어나는 방법에 대한 조언을 제공합니다. 그것은 꽤 깊은 질문입니다. 커스텀 스토어를 사용하고 join을 계산하는 자신의'Processor'를 작성해야합니다 ... –

답변

0

Matthias의 도움 덕분에 TimestampExtractors 구현과 메모리 상태 저장소 (기본값은 RockDB 사용)에서이 기능을 구현하기 위해 프로세서 API를 사용하도록 롤백했습니다.