2017-03-27 3 views
1

여기에 문제가 있습니다 :실제로 늦게 기록을 삭제하는 방법은 무엇입니까?

1 시간짜리 버킷에서이 숫자 중 MAX를 수집하려고합니다. 여기서 버킷에 최대 3 시간의 지연을 허용합니다.

tumbling windows의 경우처럼 들립니다.

는 여기에 지금까지이 작업은 다음과 같습니다

stream.aggregate(
    () -> 0L, 
     (aggKey, value, aggregate) -> Math.max(value, aggregate), 
     TimeWindows.of(TimeUnit.HOURS.toMillis(1L)).until(TimeUnit.HOURS.toMillis(3L)), 
     Serdes.Long(), 
     "my_store" 
) 

우선 나는 이것이 실제로 테스트를 주어진 일이 발생 확인할 수 없습니다. timestamp는 TimestampExtractor를 통해 추출되며 Thread.sleep으로 지연을 시뮬레이트합니다 (테스트를 위해 창을 더 작은 값으로 설정 함). 그러나 "늦은 레코드"는 여전히 처리됩니다.

일반 창에는 예제가 거의없는 것 같습니다. SessionWindows에 대한 통합 테스트가 있지만 그게 전부입니다. 나는 개념을 정확하게 이해하고 있는가?

편집 2

샘플 JUnit 테스트. 그것이 큰이기 때문에 나는 그것을 Gist를 통해 공유합니다.

https://gist.github.com/Hartimer/6018a731753846c1930429716703e5a6

EDIT

데이터 포인트는 (데이터가 수집되었을 때의) 타임 스탬프를 가지고 (기타 코드를 추가)의 데이터 값을 수집 시스템의 호스트.

{ 
    "collectedAt": 12314124134, // timestamp 
    "hostname": "machine-1", 
    "reading": 3 
} 

사용자 정의 타임 스탬프 추출기를 사용하여 collectedAt을 가져옵니다.

source.map(this::fixKey) // Associates record with a key like "<timestamp>:<hostname>" 
    .groupByKey(Serdes.String(), roundDataSerde) 
    .aggregate(
     () -> RoundData.EMPTY_ROUND, 
      (aggKey, value, aggregate) -> max(value, aggregate), 
      TimeWindows.of(TimeUnit.HOURS.toMillis(1L)) 
        .until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay 
      roundDataSerde, 
      "entries_store" 
) 
    .toStream() 
    .map(this::simpleRoundDataToAggregate) // Associates record with a key like "<timestamp floored to nearest hour>" 
    .groupByKey(aggregateSerde, aggregateSerde) 
    .aggregate(
     () -> MyAggregate.EMPTY, 
      (aggKey, value, aggregate) -> aggregate.merge(value), // I know this is not idempotent, that's a WIP 
      TimeWindows.of(TimeUnit.HOURS.toMillis(1L)) 
        .until(TimeUnit.SECONDS.toMillis(1L)), // For testing I allow 1 second delay 
      aggregateSerde, 
      "result_store" 
) 
    .print() 

시험의 조각이 어떤 도움이 많이 주시면 감사하겠습니다

Instant roundId = Instant.now().truncatedTo(ChronoUnit.HOURS).minus(9L, ChronoUnit.HOURS); 
    sendRecord("mytopic", roundId, 3); 
    sendRecord("mytopic", roundId.plusMillis(15000), 2); 

    log.info("Waiting a little before sending more usage. (simulating late record)"); 
    Thread.sleep(5000L); 

    sendRecord("mytopic", roundId.plusMillis(30000), 5); 

    // Assert stored value is "3". 
    // It actually is 5 because the last round is accounted for 

입니다 : 이것은 내 파이프 라인의보다 완전한 표현이다.

+0

이것이 테스트 설정의 효과라고 생각합니다. 여기에 더 많은 코드를 공유 할 수 있습니까? 예를 들어, "지연 기록"이 처리되지만 다른 시간 창에 표시 될 수 있습니다. 또는 입력 레코드를 쓰는 방법 (타임 스탬프가 레코드에 할당되는 위치/시간)은 후속 어설 션/검증과 호환되지 않을 수 있습니다. –

+0

나는 @ MichaelG.Noll 코드를 더 추가하는 질문을 편집했습니다 – Hartimer

+0

Thanks Hartimer. 'sendRecord'는 정확히 무엇을합니까?두 번째 매개 변수는 위에서 보았던 JSON 페이로드에있는'collectedAt' 타임 스탬프를 설정하고 있습니까? (JSON 페이로드가 레코드 값이라고 가정합니다.) –

답변

2

나는 Hartimer에 의한 자기 대답은 실제로 잘못 생각 . 적어도 내 자신의 지식에 대해서는 어떻게되는지 설명하려고 노력하겠습니다. :-)

  • 지연된 데이터는 구성된 타임 스탬프 추출기를 통해 응용 프로그램에 대해 구성된 시간 의미에 따라 처리됩니다. @ Hartimer의 경우, 이것은 이벤트 시간입니다 (여기서 사용자 정의 타임 스탬프 추출기가 사용됩니다).
  • FWIW, 처리 시간의 경우 정의 상으로는 늦게 도착한 레코드가 없습니다. 모든 레코드가 "적시에"도착합니다. "늦게 도착한"레코드 (이 컨텍스트에서는 그러한 레코드가 없음)는 현재 창에 포함되지만 이전 레코드로 되돌아 가지 않습니다.
  • 창 보유 시간을 설정하는 TimeWindows#until() 호출은 보유 시간이 인 입니다. Kafka는 구성된 보유 시간보다 "조금 더"(여기서는 의도적으로 애매한 것으로, 아래 참조) 창을 계속 볼 수 있습니다. 이런 이유로 @Hartimer의 테스트와 같은 엄격한 테스트는 직관적으로 기대할 수있는 결과를 산출하지 못할 수 있습니다. 실제로 하한되는 창 보유 시간에 관련하여 내부적으로 어떻게됩니까

는 조금 까다로운 (아마도이 ​​질문의 범위를 넘어), 내가 설명하려고 보류가 아니라면 그 나를위한 특별한 요청.

업데이트 : 그것을 던져해야하기 때문에는 또한, 질문의 조각이 코드도 작동 안되는 IllegalArgumentException :

TimeWindows.of(TimeUnit.HOURS.toMillis(1L)) 
      .until(TimeUnit.SECONDS.toMillis(1L)) 

요구 사항이며, 그 각각의 입력 매개 변수, until() >= of()합니다. 1 시간 크기의 창을 정의 할 수는 없지만 보존 기간은 1 초입니다 (보존 기간은 여기에서 1 시간 이상이어야합니다).

업데이트 2 : 배경 화면에서는 로컬 창 저장소의 세그먼트 파일을 만들거나 관리하는 데 TimeWindows#until() 설정이 사용됩니다. 창 세그먼트가 있으면 해당 창에 대해 늦게 도착한 레코드가 허용됩니다. 세그먼트를 제거/만료하는 방법에 대한 부분은 건너 뛸 것입니다. 왜냐하면 코드를 파헤쳐 야 할 필요가 있기 때문입니다.

+0

답장을 보내 주셔서 감사합니다! Gist 공유의 단위 테스트는'IllegalArgumentException'을 던지지 않으며,'advanceBy'가 지정되고'advanceBy() Hartimer

+0

아마 IAE는 최신 카프카 0.10에서만 던져 질 것이다. 2 - 아마도 이전 버전을 사용하고 있습니까? –

+0

'0.10.1.1'을 사용하고 있습니다. – Hartimer

0

나는 내 자신의 문제를 발견했다. 그것은 TimestampExtractor으로 귀결되며 "지연된 기록"을 평가하기 위해 어떤 가치를 사용 했습니까? 데이터가 기록 될 때

  • 처리 시간 :

    • 이벤트 타임는 상기 데이터가 수신 된

      카프카 스트림 측면에서

      세 개의 "시간"(see here)이 있습니다 스트림 프로세서에 의해

    • 섭취시 : (질문에 관련 없음)

    예를 들어, 실제로 이벤트 시간을 사용하여 지연되었지만 지연된 레코드를 나타내는 것은 아닙니다. 데이터를 수집 한 사람은이 값을 시간에 대한 로컬 인식 (최소한 사용 사례)으로 설정합니다.

    중요 날짜는 처리 시간은입니다. 이벤트가 생성 된시기와 관계없이 이벤트를 수신하는 데 소요 된 시간. 내 집계는 이미 "이벤트 시간"별로 그룹화를 처리합니다.

    이제 전달되는 테스트의 업데이트 된 버전으로 새로운 Gist를 만들었습니다. 추가 필드 receivedAt가 추가되어 "처리 시간"을 시뮬레이션했습니다.

    https://gist.github.com/Hartimer/c79569ad517ab95d08dbe8e84bfa6789