2017-08-03 55 views
2

我在流上下文中使用Pyspark Dataframe API,我已經在我的火花流應用程序(我使用的是kafka接收器)中將RDD轉換爲DF foreach DStream,這是我已經在我的過程RDD函數來完成:從數據庫獲取每行數據DataFrame Pyspark

rowRdd = data_lined_parameters.map(
     lambda x: Row(SYS=x[0], METRIC='temp', SEN=x[1], OCCURENCE=x[2], THRESHOLD_HIGH=x[3], OSH=x[4], OSM=x[5], OEH=x[6], OEM=x[7],OSD=x[8],OED=x[9],REMOVE_HOLIDAYS=x[10],TS=x[11],VALUE=x[12],DAY=x[13],WEEKDAY=x[14],HOLIDAY=x[15])) 
rawDataDF = sqlContext.createDataFrame(rowRdd) 

rawDataRequirementsCheckedDF = rawDataDF.filter("WEEKDAY <= OED AND WEEKDAY >=OSD AND HOLIDAY = false VALUE > THRESHOLD_HIGH ") 

我的下一個步驟是從HBase的表豐富了我的rawDataRequirementsCheckedDF每一行與新的欄目,我的問題是,什麼是最有效的方式來獲得HBase的數據(鳳凰)並將其加入到我的原始數據幀中:

--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+ 
|     DAY|HOLIDAY|METRIC|OCCURENCE|OED|OEH|OEM|OSD|OSH|OSM|REMOVE_HOLIDAYS|SEN|    SYS|THRESHOLD_HIGH|     TS| VALUE|WEEKDAY| 
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+ 
|2017-08-03 00:00:...| false| temp|  3| 4| 19| 59| 0| 8| 0|   TRUE| 1|0201|   26|2017-08-03 16:22:...|28.4375|  3| 
|2017-08-03 00:00:...| false| temp|  3| 4| 19| 59| 0| 8| 0|   TRUE| 1|0201|   26|2017-08-03 16:22:...|29.4375|  3| 
+--------------------+-------+------+---------+---+---+---+---+---+---+---------------+---+----------------+--------------+--------------------+-------+-------+ 

hbase表主鍵是DAY,SYS,SEN,因此它將導致具有相同格式的數據幀。

編輯:

這是我到目前爲止已經試過:

sysList = rawDataRequirementsCheckedDF.map(lambda x : "'"+x['SYS']+"'").collect() 
df_sensor = sqlContext.read.format("jdbc").option("dbtable","(select DATE,SYSTEMUID,SENSORUID,OCCURENCE from ANOMALY where SYSTEMUID in ("+','.join(sysList)+"))").option("url", "jdbc:phoenix:clustdev1:2181:/hbase-unsecure").option("driver", "org.apache.phoenix.jdbc.PhoenixDriver").load() 
df_anomaly = rawDataRequirementsCheckedDF.join(df_sensor, col("SYS") == col("SYSTEMUID"), 'outer') 

回答

1

一個簡單的方法,我從HBase的帶來的數據創建表成鳳,然後加載到火花。這是在Apache鳳凰頁的阿帕奇星火插件部分

df = sqlContext.read \ 
.format("org.apache.phoenix.spark") \ 
.option("table", "TABLE1") \ 
.option("zkUrl", "localhost:2181") \ 
.load() 

鏈接到Apache星火插件:https://phoenix.apache.org/phoenix_spark.html

+0

謝謝您的回答,我可能不會一直在清楚我的問題,但我的問題是,我的sql請求中使用的參數是從我的rawDataRequirementsCheckedDF發出的,我需要從我的hbase表中獲得foreach SYS數據 – azelix

+0

我試圖做類似於: sysList = rawDataRequirementsCheckedDF.map(lambda x:「'」+ x ['SYS' ] +「'」)。collect() df_sensor = sqlContext.read.format(「jdbc」)。選項(「dbtable」,「(從select中選擇OCCURENCE where SYSTEMUID in(」+','。join(sysList )+「))」)。option(「url」,「jdbc:phoenix:dev1:2181:/ hbase-unsecure」)。option(「driver」,「org.apache.phoenix.jdbc.PhoenixDriver」)。load () 但我不確定這是否是最好的方法 – azelix