2017-12-11 9 views
0

나는 Apache Flink 튜토리얼을 따라 TaxiRide 이벤트 스트림을 정리합니다. 결과 스트림이 콘솔에 인쇄됩니다. 이제는 csv 파일에 기록하고 싶습니다. 내가 뭔가를 컴파일러 오류로 연결되는 DataSet<Tuple1<TaxiRide>> rides1 = filteredRides.writeAsCsv("/resources").setParallelism(1);을 만들고 있어요 때 java.lang.IllegalArgumentException: The writeAsCsv() method can only be used on data streams of tuples.Apache Flink writeAsCsv() 메소드를 사용하여 객체 튜플을 작성하십시오.

DataStreamSink<TaxiRide> rides = filteredRides.writeAsCsv("/resources").setParallelism(1); 

:

 // configure event-time processing 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     // get the taxi ride data stream 
     DataStream<TaxiRide> rides = env.addSource(
       new TaxiRideSource(path, maxEventDelay, servingSpeedFactor)); 

     DataStream<TaxiRide> filteredRides = rides 
       // filter out rides that do not start or stop in NYC 
       .filter(new RideCleansing.NYCFilter()); 

     filteredRides.print(); 

나는 다음하지만 오류가 시도했습니다.

결과로 정리 된 TaxiRide 스트림을 csv 파일에 쓰려면 어떻게해야합니까?

답변

1

DataStreamDataSet은 혼합 할 수없는 별도의 API에 속합니다. 따라서 컴파일 오류가 발생합니다.

"writeAsCsv() 메서드는 튜플의 데이터 스트림에서만 사용할 수 있습니다."라는 오류 메시지가 표시됩니다. 즉, DataStream<TaxiRide> 개체를 DataStream 개의 튜플로 변환해야만 CSV 파일로 쓸 수 있다는 의미입니다. 이것은 간단한 MapFunction으로 수행 할 수 있습니다 : 당신이 DataStreamrideTuples이 있으면 TupleConverter

DataStream<Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> rideTuples = filteredRides 
    .map(new TupleConverter()); 

class TupleConverter implements MapFunction<TaxiRide, Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short>> { 

    public Tuple9<Long, Boolean, DateTime, DateTime, Float, Float, Float, Float, Float, Short> map(TaxiRide ride) { 
    return Tuple9.of(ride.rideId, ride.isStart, ...); 
    } 
} 

로 정의되고, 당신은 CSV 파일에 쓸 수 있습니다.