2017-02-19 166 views
0

我試圖使用Apache Flink獲取Twitter流API的一些消息。Apache Flink - 無法從Twitter獲取數據

但是,我的代碼沒有在輸出文件中寫入任何內容。我正在計算特定單詞的輸入數據。

普萊舍檢查我的例子:

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.streaming.connectors.twitter._ 
import org.apache.flink.api.java.utils.ParameterTool 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint} 
import org.apache.flink.streaming.api.windowing.time.Time 

import scala.collection.JavaConverters._ 


////////////////////////////////////////////////////// 
// Create an Endpoint to Track our terms 
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable { 
    @Override 
    def createEndpoint(): StreamingEndpoint = { 
    //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0)) 
    val endpoint = new StatusesFilterEndpoint() 
    //endpoint.locations(List(chicago).asJava) 
    endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava) 
    endpoint 
    } 
} 

object Connection { 
    def main(args: Array[String]): Unit = { 

    val props = new Properties() 

    val params: ParameterTool = ParameterTool.fromArgs(args) 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    env.getConfig.setGlobalJobParameters(params) 
    env.setParallelism(params.getInt("parallelism", 1)) 

    props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key")) 
    props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key")) 
    props.setProperty(TwitterSource.TOKEN, params.get("token")) 
    props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret")) 

    val source = new TwitterSource(props) 
    val epInit = new myFilterEndpoint() 

    source.setCustomEndpointInitializer(epInit) 

    val streamSource = env.addSource(source) 

    streamSource.map(s => (0, 1)) 
     .keyBy(0) 
     .timeWindow(Time.minutes(2), Time.seconds(30)) 
     .sum(1) 
     .map(t => t._2) 
     .writeAsText(params.get("output")) 

    env.execute("Twitter Count") 
    } 
} 

的一點是,我沒有錯誤消息,我可以在我的儀表盤看到的。我的源是發送數據到我的TriggerWindow。但它沒有收到任何數據: enter image description here

我有兩個問題在一次。

第一:爲什麼我的源發送字節到我的TriggerWindow如果沒有收到任何東西?

Seccond:我的代碼有些問題,我無法從twitter獲取數據?

+0

第一次結果應該在2分鐘後寫出(即窗口的長度)。你等了那麼久嗎? TriggerWindow已經收到了數據,但是在43s之後,肯定不會有任何東西寫入文件。你的代碼看起來不錯。 –

+0

嗨@DawidWysakowicz,是的,我等了那麼久。隨便我運行這個代碼2個小時。我爲這個問題拍了照片。但是Flink沒有輸出:( –

回答

1

您的應用程序源沒有發送實際的記錄,你可以通過查看記錄看到窗口發送列。發送的字節屬於Flink不時在任務之間發送的控制消息。更具體地說,它是用於測量Flink作業的端到端延遲的LatencyMarker消息。

該代碼看起來不錯。我甚至試過你的代碼併爲我工作。因此,我得出結論:Twitter連接證書必須有問題。請重新檢查您是否輸入了正確的憑證。