5

Google PubSub의 일부 데이터를 Python 데이터 흐름을 사용하여 BigQuery로 스트리밍하려고합니다. 내가Pub/Sub에서 BigQuery로 스트리밍

options.view_as(StandardOptions).streaming = True 

을 설정하여 스트리밍 파이프 라인에 다음 코드 https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py를 적응 테스트를 위해 그래서 그때 하위/펍에서

# ADDED THIS 
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15)) 
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) 
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode)) 
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) 
records | 'Write' >> beam.io.Write(
    beam.io.BigQuerySink(
     OUTPUT, 
     schema=table_schema, 
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 

주를 읽을 수있는 record_ids 파이프 라인을 변경 : 내가 허용 한 한 나는 그것을 시도 할 때 구글에 의해

(알파) 코드를 실행하는 지금은 오류

Workfl에게 있습니다 아야 실패했습니다. 원인 : (f215df7c8fcdbb00) : 알 수없는 스트리밍 싱크 : 나는 사람이 어떻게 나에게 알려 주시기 바랍니다 수 있습니다,이 파이프 라인은 이제 형 스트리밍의 행복과 관련이 있다고 생각 https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

: BigQuery를

당신은 여기에 전체 코드를 찾을 수 있습니다 스트리밍 파이프 라인에서 bigQuery를 작성 하시겠습니까?

답변

2

빔 파이썬은 스트리밍 파이프 라인에서 BigQuery에 대한 쓰기를 지원하지 않습니다. 지금은 Beam Java를 사용해야합니다. 각각 PubsubIO.readStrings()BigQueryIO.writeTableRows()을 사용할 수 있습니다.

+0

좋습니다, 유진 감사합니다. 나는 파이썬을 사용하기를 희망했다. 이것이 미래에 바뀌는 지 아십니까? Pub/Sub에서 코드를 읽고 Java에서 BigQuery에 코드를 작성하는 예제로 나를 안내 할 수 있습니까? –

+0

이 예제에서는 https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java를 사용합니다. 예, 파이썬은 결국 자바를 따라 잡을 것입니다 (잠재적으로 빔의 현재 개발중인 이식성 프레임 워크를 통해 파이썬 파이프 라인에서 자바 변환을 사용할 수 있습니다).하지만 타임 라인이 어떻게 될지 예측할 수는 없습니다. – jkff