Apache Flink를 사용하여 Twitter Streaming API로 메시지를 보내려고합니다.Apache Flink - Twitter에서 데이터를 가져올 수 없습니다.
그러나 내 코드는 출력 파일에 아무 것도 쓰지 않습니다. 특정 단어에 대한 입력 데이터를 계산하려고합니다.
Plese 내 예를 확인 :
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.twitter._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.JavaConverters._
//////////////////////////////////////////////////////
// Create an Endpoint to Track our terms
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
@Override
def createEndpoint(): StreamingEndpoint = {
//val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
val endpoint = new StatusesFilterEndpoint()
//endpoint.locations(List(chicago).asJava)
endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
endpoint
}
}
object Connection {
def main(args: Array[String]): Unit = {
val props = new Properties()
val params: ParameterTool = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setParallelism(params.getInt("parallelism", 1))
props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
props.setProperty(TwitterSource.TOKEN, params.get("token"))
props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))
val source = new TwitterSource(props)
val epInit = new myFilterEndpoint()
source.setCustomEndpointInitializer(epInit)
val streamSource = env.addSource(source)
streamSource.map(s => (0, 1))
.keyBy(0)
.timeWindow(Time.minutes(2), Time.seconds(30))
.sum(1)
.map(t => t._2)
.writeAsText(params.get("output"))
env.execute("Twitter Count")
}
}
요점은, 내가 오류 메시지가 없으며 내 대시 보드에서 볼 수 있습니다. 내 소스가 데이터를 내 TriggerWindow로 보내고 있습니다. 하지만 데이터를받지 못합니다.
한 번에 두 가지 질문이 있습니다.
첫 번째 이유 : 아무것도 가져 오지 않으면 내 소스가 내 TriggerWindow로 바이트를 보내는 이유는 무엇입니까?
Seccond : 내 코드에 트위터에서 데이터를 가져올 수없는 것이 있습니까?
첫 번째 결과는 2 분 후 (즉, 창의 길이) 작성해야합니다. 오래 기다렸 니? TriggerWindow는 데이터를 받았지만 43 초 후에는 파일에 아무 것도 쓰지 않습니다. 너의 코드가 다 좋아 보인다. –
안녕하세요 @DawidWysakowicz, 네, 그렇게 오래 기다립니다. 정확하게이 코드를 2 시간 동안 실행했습니다. 나는 그 질문에 대한 지문을 가져 갔다. 그러나 Flink에서 출력이 없습니다. ( –