2017-10-27 8 views
0

다른 처리 (BulkIteration) 전에 DataSet에서 count()를 사용하면 아파치 플 링크가 실행됩니다 count()에 대한 계획을 세우고 다른 작업을 건너 뜁니다. 로그에서 로그에 대해 아무것도 찾을 수 없었습니다.Apache Flink : DataSet에서 count()를 사용할 때만이 작업이 실행됩니다.

더 많은 것은 내 IDE에서 발생하지 않습니다. 거기에 모든 작업이 작동합니다. WebUI를 통해 업로드 할 때만 이런 종류의 문제가 발생합니다.

So : 일반적인 문제입니까? 직접 계산할 필요없이 어떻게 해결할 수 있습니까?

감사합니다.

UPDATE :

코드는 이런 비슷한 않습니다 (물론, 나도 알아,이 예는 생산적인 코드를 잘 설계되지 않았는지,하지만 내 문제를 보여줍니다).

import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.aggregation.Aggregations; 
import org.apache.flink.api.java.tuple.Tuple1; 

import java.util.LinkedList; 
import java.util.List; 
import java.util.Random; 

public class CountProblemExample { 

    public static void main(String[] args) throws Exception { 
     Random rnd = new Random(); 

     int randomNumber = 100000 + rnd.nextInt(100000); 

     List<Double> doubles = new LinkedList<>(); 
     for (int i = 0; i < randomNumber; i++) { 
      doubles.add(rnd.nextDouble()); 
     } 

     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

     DataSet<Double> doubleDataSet = env.fromCollection(doubles); 

     final int count = (int)doubleDataSet.count(); // In the UI there the code stops further execution 

     DataSet<Double> avgSet = doubleDataSet 
       .map(new MapFunction<Double, Tuple1<Double>>() { 
        @Override 
        public Tuple1<Double> map(Double value) throws Exception { 
         return new Tuple1<>(value); 
        } 
       }) 
       .aggregate(Aggregations.SUM, 0) 
       .map(new MapFunction<Tuple1<Double>, Double>() { 
        @Override 
        public Double map(Tuple1<Double> t) throws Exception { 
         double avg = 0; 
         if (count > 0) { 
          avg = t.f0/count; 
         } 

         return avg; 
        } 
       }); 

     double avg = avgSet 
       .collect() 
       .get(0); 

     System.out.println(avg); 
    } 

} 
+0

코드를 게시 할 수 있습니까? –

+0

예, 나중에 코드를 업데이트하겠습니다. 고맙습니다! –

+0

나는 내 게시물을 편집하고 코드 예제를 포함 시켰습니다. 기꺼이, 만약 당신이 나를 도울 수있어! –

답변

1

아마도 ExecutionEnvironment.execute()으로 전화하는 걸 잊었을 것입니다. 해당 메서드를 호출하기 전에 DataSet 작업이 실행되지 않습니다.

DataSet.count()DataSet.collect() 내부 트리거도 실행됩니다.

+0

좋은 힌트, 아니요. 나중에 collect() 호출이 있기 때문에 나는 그것을 잊지 않았다. 그 것을 잊어 버렸다면, IDE에서도 작동하지 않을 것입니다. –

+0

코드 예제를 추가했습니다. 아마도 그것에 대해 sth라고 말할 수 있을까요? –