2016-09-24 7 views
3

beam.io.Write(beam.io.BigQuerySink(..)) 작업을 ParDo 함수 내에서 호출하여 PCollection에있는 각 키에 대해 별도의 BigQuery 테이블을 생성하고 싶습니다. (저는 파이썬 SDK를 사용하고 있습니다). 다음은 불행히도 도움이되지 않았다 두 개의 유사한 스레드입니다 :ParDo 함수 내에서 BigQuery에 쓰기

1) https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2) Dynamic table name when writing to BQ from dataflow pipelines

다음 코드에 대한 행을 실행하면 첫 번째 키는 다음의 BigQuery에 삽입 취득 파이프 라인은 아래 오류로 실패합니다. 제가 잘못하고있는 것에 대한 제안이나 그것을 고치는 방법에 대한 제안을 정말 고맙게 생각합니다.

파이프 라인 코드 :

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query)) 

class par_upload(beam.DoFn): 

    def process(self, context): 
     key, value = context.element 

     ### This block causes issues ### 
     value | 'write_to_bq' >> beam.io.Write(
         beam.io.BigQuerySink(
          'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key 
          schema=schema, 
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
          ) 
      ) 
     ### End block ###### 
     return [value] 


### Following part works fine ### 
filtered = (rows | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
        | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
        | 'group_by_key' >> beam.GroupByKey() 
        | 'par_upload_to_bigquery' >> beam.ParDo(par_upload()) 
        | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing 
       ) 

### This part works fine if I comment out the 'write_to_bq' block above 
filtered | 'write_to_bq' >> beam.io.Write(
     beam.io.BigQuerySink(
      'PROJECT-NAME:analytics.another_table', 
      schema=schema, 
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, 
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) 
     ) 

오류 메시지 : (첫 번째 대답 후)

INFO:oauth2client.client:Attempting refresh to obtain initial access_token 
INFO:oauth2client.client:Attempting refresh to obtain initial access_token 
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table. 
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})} 
ERROR:root:Error while visiting par_upload_to_bigquery 
Traceback (most recent call last): 
    File "split_events.py", line 137, in <module> 
    run() 
    File "split_events.py", line 132, in run 
    p.run() 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run 
    return self.runner.run(self) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run 
    super(DirectPipelineRunner, self).run(pipeline) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run 
    pipeline.visit(RunVisitor(self)) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit 
    part.visit(visitor, pipeline, visited) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit 
    visitor.visit_transform(self) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform 
    return m(transform_node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper 
    func(self, pvalue, *args, **kwargs) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo 
    runner.process(v) 
    File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483) 
    File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311) 
    File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677) 
    File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process 
    return self.run(self.dofn.process, context, args, kwargs) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run 
    result = method(context, *args, **kwargs) 
    File "split_events.py", line 73, in process 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__ 
    return self.transform.__ror__(pvalueish, self.label) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__ 
    return _MaterializePValues(cache).visit(result) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit 
    return self._pvalue_cache.get_unwindowed_pvalue(node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue 
    return [v.value for v in self.get_pvalue(pvalue)] 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue 
    value_with_refcount = self._cache[self.key(pvalue)] 
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']" 

편집 :

나는 나의 가치를 실현하지 않았다여야합니다..

나는 (아마도 매우 비효율적 인) 지금이 내 코드를 변경했습니다 : 지금 로컬하지만를 잘 작동

key_pipe = p | 'pipe_' + key >> beam.Create(value) 
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..)) 

를 파이프 라인이 실패 BlockingDataflowPipelineRunner

:-(와 다음 오류 :

JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work 
    work_executor.execute() 
    File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331) 
    op.start() 
    File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193) 
    def start(self): 
    File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499) 
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5) 
+0

첫 번째 대답은 여전히 ​​정확하다고 생각합니다. 파이프 라인의 일부로 실행중인'DoFn' 내에서 파이프 라인에 더 많은 단계를 추가 할 수 없습니다. DirectRunner에서 이것이 작동한다는 사실은 버그입니다. 이런 종류의 데이터 의존적 인 쓰기를 원한다면 다른 스레드가 제안하는 것처럼 지금은 BigQuerySink를 사용하는 대신 BigQuery API와 직접 상호 작용해야합니다. –

답변

0

BigQuery 쓰기 작업을 수행하는 유일한 방법은 다음과 같습니다. ParDo에서 직접 BigQuery API를 사용하거나 client을 사용했습니다.

작성한 코드는 Dataflow ParDo classbeam.io.BigQuerySink()을 DoFn 기능에 넣는 것입니다. ParDo 클래스는 PCollection (작동 코드 예에서 filtered)과 같이 작동 할 것으로 예상합니다. value에서 작동하는 작동하지 않는 코드의 경우는 그렇지 않습니다.

가장 쉬운 옵션은 gcloud-python BigQuery 함수 insert_data()을 살펴보고 이것을 ParDo에 넣는 것입니다.