2017-10-11 5 views

답변

1

이 PCollection의 크기를 확인하는 방법은 없습니다 :

측면 입력
import apache_beam as beam 
from apache_beam import pvalue 

is_empty_check = (your_pcollection 
        | "Count" >> beam.combiners.Count.Globally() 
        | "Is empty?" >> beam.Map(lambda n: n == 0) 
        ) 

another_pipeline_branch = (
    p 
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check)) 
) 

사용법은 다음과 같다 PCollection이 Java SDK의 일반적인 Collection과 같지 않기 때문에 Countransform (Count.globally() 또는 Combine.combineFn() 등)에 PTransform을 적용하지 않아도됩니다.

데이터가 적용되는 작업 (예 : PTransform)을 위해 데이터가 컬렉션으로 공급되는 바운드 또는 무제한 데이터 수집의 추상화입니다. 또한 그것은 병렬 처리됩니다 (클래스 시작 부분의 P가 제안하는 것처럼).

따라서 각 작업자/노드의 요소 수를 계산하고 이들을 결합하여 값을 얻는 메커니즘이 필요합니다. 그것이 0인지 또는 n인지는 변환이 끝날 때까지 알 수 없습니다.

1

사용중인 SDK를 지정하지 않았으므로 Python을 사용합니다. 이 코드는 Java로 쉽게 이식 가능합니다.

요소의 전체 계산을 적용한 다음 간단한 비교를 적용하여 숫자 값을 부울로 매핑 할 수 있습니다. 당신은 좌우 입력 할 pvalue.AsSingleton 기능을 사용하여이 값을 다음과 같이 할 수 있습니다 :

def do_something(element, is_empty): 
    if is_empty: 
     # yes 
    else: 
     # no