여기에 문제가 있습니다 :실제로 늦게 기록을 삭제하는 방법은 무엇입니까?
1 시간짜리 버킷에서이 숫자 중 MAX를 수집하려고합니다. 여기서 버킷에 최대 3 시간의 지연을 허용합니다.
tumbling windows의 경우처럼 들립니다.
는 여기에 지금까지이 작업은 다음과 같습니다
stream.aggregate(
() -> 0L,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)),
Serdes.Long(),
"my_store"
)
우선 나는 이것이 실제로 테스트를 주어진 일이 발생 확인할 수 없습니다. timestamp는 TimestampExtractor를 통해 추출되며 Thread.sleep
으로 지연을 시뮬레이트합니다 (테스트를 위해 창을 더 작은 값으로 설정 함). 그러나 "늦은 레코드"는 여전히 처리됩니다.
일반 창에는 예제가 거의없는 것 같습니다. SessionWindows에 대한 통합 테스트가 있지만 그게 전부입니다. 나는 개념을 정확하게 이해하고 있는가?
편집 2
샘플 JUnit 테스트. 그것이 큰이기 때문에 나는 그것을 Gist를 통해 공유합니다.
https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6
EDIT
데이터 포인트는 (데이터가 수집되었을 때의) 타임 스탬프를 가지고 (기타 코드를 추가)의 데이터 값을 수집 시스템의 호스트.
{
"collectedAt": 12314124134, // timestamp
"hostname": "machine-1",
"reading": 3
}
사용자 정의 타임 스탬프 추출기를 사용하여 collectedAt
을 가져옵니다.
source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>"
.groupByKey(Serdes.String(), roundDataSerde)
.aggregate(
() -> RoundData.EMPTY_ROUND,
(aggKey, value, aggregate) -> max(value, aggregate),
TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
.until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
roundDataSerde,
"entries_store"
)
.toStream()
.map(this::simpleRoundDataToAggregate) // Associates record with a key like "<timestamp floored to nearest hour>"
.groupByKey(aggregateSerde, aggregateSerde)
.aggregate(
() -> MyAggregate.EMPTY,
(aggKey, value, aggregate) -> aggregate.merge(value), // I know this is not idempotent, that's a WIP
TimeWindows.of(TimeUnit.HOURS.toMillis(1L))
.until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay
aggregateSerde,
"result_store"
)
.print()
시험의 조각이 어떤 도움이 많이 주시면 감사하겠습니다
Instant roundId = Instant.now().truncatedTo(ChronoUnit.HOURS).minus(9L, ChronoUnit.HOURS);
sendRecord("mytopic", roundId, 3);
sendRecord("mytopic", roundId.plusMillis(15000), 2);
log.info("Waiting a little before sending more usage. (simulating late record)");
Thread.sleep(5000L);
sendRecord("mytopic", roundId.plusMillis(30000), 5);
// Assert stored value is "3".
// It actually is 5 because the last round is accounted for
입니다 : 이것은 내 파이프 라인의보다 완전한 표현이다.
이것이 테스트 설정의 효과라고 생각합니다. 여기에 더 많은 코드를 공유 할 수 있습니까? 예를 들어, "지연 기록"이 처리되지만 다른 시간 창에 표시 될 수 있습니다. 또는 입력 레코드를 쓰는 방법 (타임 스탬프가 레코드에 할당되는 위치/시간)은 후속 어설 션/검증과 호환되지 않을 수 있습니다. –
나는 @ MichaelG.Noll 코드를 더 추가하는 질문을 편집했습니다 – Hartimer
Thanks Hartimer. 'sendRecord'는 정확히 무엇을합니까?두 번째 매개 변수는 위에서 보았던 JSON 페이로드에있는'collectedAt' 타임 스탬프를 설정하고 있습니까? (JSON 페이로드가 레코드 값이라고 가정합니다.) –