3

카프카에서 여러 개의 파티션을 주문할 수없고 단일 파티션의 그룹 내 단일 소비자에 대해서만 파티션 순서가 보장된다는 사실을 알고 있습니다. 그러나 Kafka Streams 0.10을 사용하면 이것을 달성 할 수 있습니까? 타임 스탬프 기능을 사용하여 각 파티션의 각 메시지가 소비자 측에서 주문을 유지할 경우 Kafka Streams 0.10으로 말하도록 허용 할 수 있습니까? 우리가 모든 메시지를 받았다고 가정하면 소비 된 타임 스탬프를 기반으로 모든 파티션을 정렬 할 수없고 소비를 위해 별도의 주제로 전달할 수 있습니까?카프카 다중 파티션 주문

지금 주문을 유지해야하지만 이것은 단일 소비자 스레드가있는 단일 파티션을 갖는 것을 의미합니다. 병렬성을 높이기 위해이를 여러 파티션으로 변경하려고했지만 어떻게 든 순서대로 가져 오려고했습니다.

의견이 있으십니까? 고맙습니다.

답변

0

나는 카프카 스트림을 사용하지 않지만 일반 소비자와 함께이 작업을 수행 할 수 있습니다.

먼저 파티션을 정렬합니다. 여기서는 이미 소비자 그룹에서 원하는 오프셋 또는 오프셋을 찾았다 고 가정합니다.

private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) { 

    Set<TopicPartition> pollPartitions = events.partitions(); 
    List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>(); 
    for (TopicPartition tp : pollPartitions) { 
     orderEvents.add(events.records(tp)); 
    } 
    // order the list by the first event, each list is ordered internally also 
    orderEvents.sort(new PartitionEventListComparator()); 
    return orderEvents; 
} 

/** 
* Used to sort the topic partition event lists so we get them in order 
*/ 
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> { 

    @Override 
    public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) { 
     long c1 = list1.get(0).timestamp(); 
     long c2 = list2.get(0).timestamp(); 
     if (c1 < c2) { 
      return -1; 
     } else if (c1 > c2) { 
      return 1; 
     } 

     return 0; 
    } 


} 

그런 다음 파티션을 라운드 로빈하여 이벤트를 순서대로 얻으십시오. 실제로이 방법이 작동하는 것으로 나타났습니다.

   ConsumerRecords<String, String> events = consumer.poll(500); 
       int totalEvents = events.count(); 
       log.debug("Polling topic - recieved " + totalEvents + " events"); 
       if (totalEvents == 0) { 
        break; // no more events 
       } 

       List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events); 

       int cnt = 0; 
       // Each list is removed when it is no longer needed 
       while (!orderEvents.isEmpty() && sent < max) { 
        for (int j = 0; j < orderEvents.size(); j++) { 
         List<ConsumerRecord<String, String>> subList = orderEvents.get(j); 
         // The list contains no more events, or none in our time range, remove it 
         if (subList.size() < cnt + 1) { 
          orderEvents.remove(j); 
          log.debug("exhausted partition - removed"); 
          j--; 
          continue; 
         } 
         ConsumerRecord<String, String> event = subList.get(cnt); 
         cnt++ 
} 
8

당신이 이러한 상황에 직면하고있는 두 가지 문제가 있습니다

  1. 카프카의 여러 파티션이 주제와 멀티에 대한 카프카 (주제의) 글로벌 순서를 보장하지 않는 사실 - 파티션 주제.
  2. 시간 및 타임 스탬프와 관련된 주제 및 해당 파티션에 대한 늦은 도착/부재 중 메시지의 가능성.

가 나는 카프카의 여러 파티션을 주문 할 수없는 것을 알고, 그 파티션 주문은 (단일 파티션) 그룹 내에서 하나의 소비자에 대한 보장됩니다. 그러나 Kafka Streams 0.10을 사용하면 이것을 달성 할 수 있습니까?

짧은 대답은 다음과 같습니다. 아니요, 여러 파티션이있는 카프카 항목에서 읽을 때 여전히 전체 주문을 수행 할 수 없습니다.

또한 "파티션 순서 지정"은 "파티션의 메시지 오프셋을 기준으로 파티션 순서 지정"을 의미합니다. 주문 보증은 메시지의 타임 스탬프와 관련이 없습니다.

마지막 순서에는 보장 max.in.flight.requests.per.connection == 1 경우 : 아파치 카프카 문서에서

Producer configuration settings : max.in.flight.requests.per.connection (기본값 : 5) : 클라이언트가 차단되기 전에 하나의 연결에 보낼 확인되지 않은 요청의 최대 수 . 이 설정을 1보다 크게 설정하고 실패한 전송이있는 경우 재시도 (재시도가 사용 가능한 경우)로 인해 메시지 순서가 변경 될 위험이 있습니다.

이 시점에서 우리는 소비자 행동 (카파의 원래 질문에서 출발 한 것)과 카프카의 생산자 행동에 대해 이야기합니다.

각 파티션의 각 메시지가 소비자 측에서 주문을 유지할 수 있도록 타임 스탬프 기능을 사용하면 Kafka Streams 0.10으로 말할 수있게 되었습니까?

타임 스탬프 기능을 사용해도 "각 파티션의 각 메시지가 순서를 유지"하지 못합니다. 왜? 늦게 도착/부재 중 메시지의 가능성 때문입니다.

파티션은 오프셋으로 정렬되지만 타임 스탬프별로 정렬되지는 않습니다. 파티션의 다음과 같은 내용이 실제로 완벽하게 가능하다 (타임 스탬프는 일반적이다 밀리 초 - 이후 - 더 - 시대) :

Partition offsets  0 1 2 3 4 5 6 7 8 
Timestamps   15 16 16 17 15 18 18 19 17 
              ^^ 
             oops, late-arriving data! 

무엇 늦게 도착/순서가 잘못된 메시지입니다? 전 세계에 흩어져있는 센서가 있다고 가정 해보십시오. 센서는 모두 해당 지역의 온도를 측정하고 최신 측정 값을 카프카 (Kafka) 주제로 보냅니다. 일부 센서는 신뢰할 수없는 인터넷 연결성을 가질 수 있으므로 측정 시간이 분, 시간 또는 심지어 며칠 지연 될 수 있습니다. 결과적으로 지연된 측정 결과는 카프카에게 전달되지만, 늦게 도착할 것입니다. 도시의 휴대 전화와 동일 : 일부는 배터리/에너지가 부족하여 데이터를 전송하기 전에 충전해야하거나 지하로 운전할 때 인터넷 연결이 끊어 질 수 있습니다.

모든 메시지를 받으면 소비 된 타임 스탬프에 따라 모든 파티션을 정렬 할 수없고 소비를 위해 별도의 주제로 전달할 수 있습니까?

이론적으로는 그렇지만 사실은 매우 어렵습니다. "우리가 모든 메시지를 받는다"는 가정은 스트리밍 시스템에 실제로 도전적입니다 (배치 처리 시스템의 경우에도 마찬가지입니다. 그러나 늦게 도착한 데이터의 문제는 여기에서 종종 간단히 무시됩니다). 늦게 도착하는 데이터의 가능성 때문에 실제로 "모든 메시지"를 받았는지 여부는 알 수 없습니다. 늦게 도착하는 메시지를 받으면 무엇을하고 싶습니까? 메시지를 다시 "다시 정렬"하거나 다시 정렬 (늦게 도착한 메시지 포함)하거나 늦게 도착한 메시지를 무시 (잘못된 결과 계산)? 어떤면에서 "모두 정렬 해 봅시다"라는 그러한 전역 질서는 매우 값 비싸거나 최선의 노력입니다.