2

Pub/Sub에서 이벤트를 읽고 BigQuery에 기록하는 Streaming Dataflow 작업을 작성하려고합니다. 레코드 ID를 사용하는 경우Google Pub/Sub to Dataflow, 레코드 ID가 중복 된 것을 피하십시오.

문서에 따르면, 데이터 흐름이 중복 메시지 전달을 감지 할 수 있습니다 (참조 : https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids을)도

하지만를, 난 여전히 (0.0002 % 내외) 일부 중복이이 기록 ID를 사용하여.

내가 뭔가를 놓쳤습니까?

편집 : 다음

Message 
     .builder() 
     .data(new String(Base64.encodeBase64(json.getBytes()))) 
     .attributes("myid", id, "mytimestamp", timestamp.toString) 
     .build() 

내가 흐름에 술집/하위에서 메시지를 읽고 저장하는 Spotify scio를 사용 :

내가 다음 snipplet에 메시지를 게시 할 Spotify Async PubSub Client를 사용

val input = sc.withName("ReadFromSubscription") 
       .pubsubSubscription(subscriptionName, "myid", "mytimestamp") 
input 
    .withName("FixedWindow") 
    .withFixedWindows(windowSize) // apply windowing logic 
    .toWindowed // convert to WindowedSCollection 
    // 
    .withName("ParseJson") 
    .map { wv => 
     wv.copy(value = TableRow(
     "message_id" -> (Json.parse(wv.value) \ "id").as[String], 
     "message" -> wv.value) 
    ) 
    } 
    // 
    .toSCollection // convert back to normal SCollection 
    // 
    .withName("SaveToBigQuery") 
    .saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND) 

창 크기는 1 분입니다.

메시지를 삽입하는 데 몇 초 밖에 걸리지 않아 이미 BigQuery에서 중복 된 메시지가 있습니다. 그들을보고

SELECT 
    COUNT(message_id) AS TOTAL, 
    COUNT(DISTINCT message_id) AS DISTINCT_TOTAL 
FROM my_dataset.my_table 

//returning 273666 273564 

그리고이 하나 :

나는 중복 계산이 쿼리를 사용

SELECT * 
FROM my_dataset.my_table 
WHERE message_id IN (
    SELECT message_id 
    FROM my_dataset.my_table 
    GROUP BY message_id 
    HAVING COUNT(*) > 1 
) ORDER BY message_id 

//returning for instance: 
row|id         | processed_at   | processed_at_epoch  
1 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410367 { ...json1... } 
2 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410368 { ...json1... } 
3 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973907 { ...json2... } 
4 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973741 { ...json2... } 
5 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:09:10 UTC 1486044550489 { ...json3... } 
6 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:08:52 UTC 1486044532680 { ...json3... } 
+0

레코드 ID를 어떻게 사용하고 복제본을 측정하는지 자세히 설명해 주시겠습니까? "데이터 흐름은 Pub/Sub에 10 분 이상 떨어져 게시 된 동일한 레코드 ID 값을 가진 메시지에 대해이 중복 제거를 수행하지 않습니다."라는 문서를 참고하십시오. 그것이 당신의 관찰 된 중복을 야기 할 수 있습니까? –

+0

더 많은 정보를 추가했습니다 :) –

답변

1

BigQuery documentation states 중복이 도착 드문 경우가있을 수 있음 :

  1. "BigQuery는이 ID를 적어도 1 분 이상 기억합니다."- 데이터 흐름이 1 분 이상 걸렸을 때 BigQuery를 삽입하면 중복이 허용 될 수 있습니다. 파이프 라인의 로그를보고 이것이 맞는지 확인할 수 있습니다.
  2. "드물게 예기치 않게 연결이 끊어지는 Google 데이터 센터의 경우 자동 중복 제거가 불가능할 수 있습니다."

manually removing duplicates에 대한 안내를 따르세요. 또한 각 레코드와 함께 사용 된 insertID을 확인하여 데이터 흐름 측면 (동일한 레코드에 대해 다른 insertID 초 생성) 또는 BigQuery 쪽 (해당 insertID에 기반한 행 중복 제거 실패)에 문제가 있는지 확인할 수 있습니다. .