2017-02-21 65 views
0

我有一種情況,我必須根據涉及到外部數據引用的某些情況過濾數據流中的數據點。我在Dataframe中加載了外部數據(這樣我就可以使用SQL接口對它進行查詢)。但是當我試圖在Dataframe上查詢時,我發現我們無法在transform(filter)函數中訪問它。 (示例代碼如下)Spark Streaming:在流轉換期間使用外部數據

// DStream is created and temp table called 'locations' is registered 
    dStream.filter(dp => { 
      val responseDf = sqlContext.sql("select location from locations where id='001'") 
      responseDf.show() //nothing is displayed 
      // some condition evaluation using responseDf 
      true 
    }) 

我做錯了什麼?如果是的話,那麼在流轉換階段加載外部數據到內存中並查詢它會是更好的方法。

回答

0

使用SparkSession而不是SQLContext解決了問題。下面的代碼,

  val sparkSession = SparkSession.builder().appName("APP").getOrCreate() 
      val df = sparkSession.createDataFrame(locationRepo.getLocationInfo, classOf[LocationVO]) 
      df.createOrReplaceTempView("locations") 

      val dStream: DStream[StreamDataPoint] = getdStream() 

      dStream.filter(dp => { 
       val sparkAppSession = SparkSession.builder().appName("APP").getOrCreate() 
       val responseDf = sparkAppSession.sql("select location from locations where id='001'") 
       responseDf.show() // this prints the results 
       // some condition evaluation using responseDf 
       true 
      })