2017-02-19 7 views
0

Apache Flink를 사용하여 Twitter Streaming API로 메시지를 보내려고합니다.Apache Flink - Twitter에서 데이터를 가져올 수 없습니다.

그러나 내 코드는 출력 파일에 아무 것도 쓰지 않습니다. 특정 단어에 대한 입력 데이터를 계산하려고합니다.

Plese 내 예를 확인 :

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.connectors.twitter._ 
import org.apache.flink.api.java.utils.ParameterTool 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint} 
import org.apache.flink.streaming.api.windowing.time.Time 

import scala.collection.JavaConverters._ 


////////////////////////////////////////////////////// 
// Create an Endpoint to Track our terms 
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable { 
    @Override 
    def createEndpoint(): StreamingEndpoint = { 
    //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0)) 
    val endpoint = new StatusesFilterEndpoint() 
    //endpoint.locations(List(chicago).asJava) 
    endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava) 
    endpoint 
    } 
} 

object Connection { 
    def main(args: Array[String]): Unit = { 

    val props = new Properties() 

    val params: ParameterTool = ParameterTool.fromArgs(args) 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    env.getConfig.setGlobalJobParameters(params) 
    env.setParallelism(params.getInt("parallelism", 1)) 

    props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key")) 
    props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key")) 
    props.setProperty(TwitterSource.TOKEN, params.get("token")) 
    props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret")) 

    val source = new TwitterSource(props) 
    val epInit = new myFilterEndpoint() 

    source.setCustomEndpointInitializer(epInit) 

    val streamSource = env.addSource(source) 

    streamSource.map(s => (0, 1)) 
     .keyBy(0) 
     .timeWindow(Time.minutes(2), Time.seconds(30)) 
     .sum(1) 
     .map(t => t._2) 
     .writeAsText(params.get("output")) 

    env.execute("Twitter Count") 
    } 
} 

요점은, 내가 오류 메시지가 없으며 내 대시 보드에서 볼 수 있습니다. 내 소스가 데이터를 내 TriggerWindow로 보내고 있습니다. 하지만 데이터를받지 못합니다. enter image description here

한 번에 두 가지 질문이 있습니다.

첫 번째 이유 : 아무것도 가져 오지 않으면 내 소스가 내 TriggerWindow로 바이트를 보내는 이유는 무엇입니까?

Seccond : 내 코드에 트위터에서 데이터를 가져올 수없는 것이 있습니까?

+0

첫 번째 결과는 2 분 후 (즉, 창의 길이) 작성해야합니다. 오래 기다렸 니? TriggerWindow는 데이터를 받았지만 43 초 후에는 파일에 아무 것도 쓰지 않습니다. 너의 코드가 다 좋아 보인다. –

+0

안녕하세요 @DawidWysakowicz, 네, 그렇게 오래 기다립니다. 정확하게이 코드를 2 시간 동안 실행했습니다. 나는 그 질문에 대한 지문을 가져 갔다. 그러나 Flink에서 출력이 없습니다. ( –

답변

1

응용 프로그램 소스가 실제 레코드를 보낸 편지 열을보고 알 수있는 창으로 보내지 않았습니다. 보내지는 바이트는 Flink가 태스크간에 수시로 보내는 제어 메시지에 속합니다. 보다 구체적으로, Flink 작업의 종단 간 대기 시간을 측정하는 데 사용되는 메시지는 LatencyMarker입니다.

코드가 좋게 보입니다. 나는 당신의 코드를 시험해보고 나를 위해 일했습니다. 따라서 트위터 연결 자격 증명에 문제가 있다는 결론을 내 렸습니다. 올바른 자격 증명을 입력했는지 다시 확인하십시오.