다른 처리 (BulkIteration) 전에 DataSet에서 count()를 사용하면 아파치 플 링크가 실행됩니다 count()에 대한 계획을 세우고 다른 작업을 건너 뜁니다. 로그에서 로그에 대해 아무것도 찾을 수 없었습니다.Apache Flink : DataSet에서 count()를 사용할 때만이 작업이 실행됩니다.
더 많은 것은 내 IDE에서 발생하지 않습니다. 거기에 모든 작업이 작동합니다. WebUI를 통해 업로드 할 때만 이런 종류의 문제가 발생합니다.
So : 일반적인 문제입니까? 직접 계산할 필요없이 어떻게 해결할 수 있습니까?
감사합니다.
UPDATE :
코드는 이런 비슷한 않습니다 (물론, 나도 알아,이 예는 생산적인 코드를 잘 설계되지 않았는지,하지만 내 문제를 보여줍니다).
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple1;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
public class CountProblemExample {
public static void main(String[] args) throws Exception {
Random rnd = new Random();
int randomNumber = 100000 + rnd.nextInt(100000);
List<Double> doubles = new LinkedList<>();
for (int i = 0; i < randomNumber; i++) {
doubles.add(rnd.nextDouble());
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Double> doubleDataSet = env.fromCollection(doubles);
final int count = (int)doubleDataSet.count(); // In the UI there the code stops further execution
DataSet<Double> avgSet = doubleDataSet
.map(new MapFunction<Double, Tuple1<Double>>() {
@Override
public Tuple1<Double> map(Double value) throws Exception {
return new Tuple1<>(value);
}
})
.aggregate(Aggregations.SUM, 0)
.map(new MapFunction<Tuple1<Double>, Double>() {
@Override
public Double map(Tuple1<Double> t) throws Exception {
double avg = 0;
if (count > 0) {
avg = t.f0/count;
}
return avg;
}
});
double avg = avgSet
.collect()
.get(0);
System.out.println(avg);
}
}
코드를 게시 할 수 있습니까? –
예, 나중에 코드를 업데이트하겠습니다. 고맙습니다! –
나는 내 게시물을 편집하고 코드 예제를 포함 시켰습니다. 기꺼이, 만약 당신이 나를 도울 수있어! –