2017-12-18 14 views
0

현재 Flink 프로젝트를 진행 중입니다. 이 프로젝트의 주요 아이디어는 JSON (네트워크 로그)의 데이터 스트림을 읽고, 상호 연관시키고, 다른 JSON의 정보가 결합 된 새로운 JSON을 생성하는 것입니다.Flink 적용 기능 on timeWindow

현재 JSON을 읽고 로그를 생성하는 컴퓨터를 기반으로 KeyedStream을 생성 한 다음 5 초의 창 스트림을 생성 할 수 있습니다.

다음 단계는 적용 함수를 사용하여 각 JSON의 정보를 결합하는 것입니다. 나는 그것을 어떻게하는지 혼란 스럽다. 내가 현재 가지고

코드는 다음과 같은 하나입니다

DataStream<Tuple2<String,JSONObject>> MetaAlert = events 
       .flatMap(new JSONParser()) 
       .keyBy(0) 
       .timeWindow(Time.seconds(5)) 
       .apply(new generateMetaAlert()); 




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> { 

     @Override 
     public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2, 
       Collector<Tuple2<String, JSONObject>> arg3) throws Exception { 


     } 

.apply (새 generateMetaAlert()) 일부가 다음 오류와 함께 불평 :

방법이 적용

(WindowFunction, R, Tuple, TimeWindow>)을 인수 (MetaAlertGenerator.generateMetaAlert)에 적용 할 수 없습니다.

다른 코드 구조 제안은 내가 작성한 것과 다릅니다. 컴파일러가 결정할 수 없기 때문에 Tuple을해야합니다 사용자 정의 WindowFunction (3 필드)에서 키의 유형 (익명 클래스를 사용하지 않고) keyBy 기능을 적용 할 때

당신의 도움이

+0

내 생각이이 DUPLI입니다 [이 질문]의 목록 (https://stackoverflow.com/questions/47033981/probleme-with-apply-function-windowfunction-in-flink). 해답이 문제를 해결하면 확인하고 종료하십시오. –

답변

0

을 위해 사전에 감사합니다 열쇠의 타입.

public class Test { 

    public Test() { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
     DataStream<String> events = env.readTextFile("datastream.log"); 

     DataStream<Tuple2<String, JSONObject>> MetaAlert 
       = events 
       .flatMap(new JSONParser()) 
       .keyBy(0) 
       .timeWindow(Time.seconds(5)) 
       .apply(new GenerateMetaAlert()); 

    } 

    public class JSONObject { 
    } 

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> { 
     @Override 
     public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> { 
     @Override 
     public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

} 

그러나 가장 간단한 방법은 당신이 String 유형 유지할 수 있도록 익명 클래스를 사용하는 것입니다 :

를이 코드는 (I 더미 코드로 공백을 채우기 위해 노력 고려) 오류없이 컴파일 당신이 클래스를 유지하려면,하지만 당신은 또한 그대로 키 당신의 유형을 유지하려는 경우
DataStream<Tuple2<String, JSONObject>> MetaAlert 
     = events 
     .flatMap(new JSONParser()) 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() { 
      @Override 
      public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 
       // Your code here 
      } 
     }); 

마지막으로, 당신은 구현할 수있는 KeySelector :

public class Test { 

    public Test() { 

     DataStream<Tuple2<String, JSONObject>> MetaAlert 
       = events 
       .flatMap(new JSONParser()) 
       .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() { 
        @Override 
        public String getKey(Tuple2<String, JSONObject> json) throws Exception { 
         return json.f0; 
        } 
       }) 
       .timeWindow(Time.seconds(5)) 
       .apply(new GenerateMetaAlert()); 
    } 

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> { 
     @Override 
     public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception { 

     } 
    } 

} 
+0

감사합니다. 작동합니다! –