2016-09-09 2 views
2

저는 Apache Flink를 사용하여 프로토 타입 응용 프로그램을 작성했습니다. 이 과정에서 특정 유스 케이스에 대해 org.apache.flink.streaming.api.functions.windowingWindowFunction을 사용하기로했습니다. 그러나 apply() 함수의 본문을 작성하는 동안이 오류가 발생합니다 (아래 코드는 필자가 작성한 응용 프로그램의 코드가 아닙니다. 내 데이터 유형이 다릅니다. Flink의 설명서 사이트에서 사용할 수있는 샘플 코드의 코드입니다) :Scala WindowFunction이 컴파일되지 않습니다.

import scala.collection.Iterable 
import scala.collection.Map 
import org.apache.flink.streaming.api.functions.windowing.WindowFunction 
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow} 
import org.apache.flink.util.Collector 
import scala.collection.JavaConversions._ 

class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { 

    def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = { 
    var count = 0L 
    for (in <- input) { 
     count = count + 1 
    } 
    out.collect(s"Window $window count: $count") 
    } 
} 

컴파일러 불평 :

Error:(16, 7) class MyWindowFunction needs to be abstract, since method apply in trait WindowFunction of type 
(x$1: String, x$2: org.apache.flink.streaming.api.windowing.windows.TimeWindow, 
x$3: Iterable[(String, Long)], 
x$4: org.apache.flink.util.Collector[String])Unit is not defined 
    class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { 

의 매개 변수의 순서를 체크인 (신청); 그들은 옳은 것처럼 보입니다.

어떤 이유로 나는 오류의 정확한 원인을 발견하지 못했습니다. 누군가 나를 솔루션에 찔러 주시겠습니까?

답변

4

이 오류의 원인을 발견했습니다.

아파치 FLINK의 API가 대신 스칼라 동등한하는 java.lang.Iterable을 기대하고 있다는 사실 나에게 분명 무엇 아니었다 :

import java.lang.Iterable // From Java 
import java.util.Map  // From Java 

import org.apache.flink.streaming.api.functions.windowing.WindowFunction 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow 
import org.apache.flink.util.Collector 

import scala.collection.JavaConversions._ // Implicit conversions 

class MyWindowFunction 
    extends WindowFunction[(String, Long), String, String, TimeWindow] { 

    override 
    def apply(
     key: String, 
     w: TimeWindow, 
     iterable: Iterable[(String, Long)], 
     collector: Collector[String]): Unit = { 

    // .... 

    } 
} 
: 그래서

class MyWindowFunction extends 
     WindowFunction[(String, Long), String, String, TimeWindow] { 

    override 
    def apply(
     key: String, 
     w: TimeWindow, 
     iterable: Iterable[(String, Long)], // from java.lang.Iterable 
     collector: Collector[String]): Unit = { 

     // .... 
    } 
} 

, 나는 적절하게 수입했다

모두 좋았습니다!