2017-09-14 57 views
0

大家好我想做增量數據查詢。在Pyspark中增量數據加載和查詢而不重新啓動Spark JOB

df = spark .read.csv('csvFile', header=True) #1000 Rows 
    df.persist() #Assume it takes 5 min 
    df.registerTempTable('data_table') #or createOrReplaceTempView 
    result = spark.sql('select * from data_table where column1 > 10') #100 rows 
    df_incremental = spark.read.csv('incremental.csv') #200 Rows 
    df_combined = df.unionAll(df_incremental) 
    df_combined.persist() #It will take morethan 5 mins, I want to avoid this, because other queries might be running at this time 
    df_combined.registerTempTable("data_table") 
    result = spark.sql('select * from data_table where column1 > 10') # 105 Rows. 
  1. 讀一個csv/MySQL的表數據到數據幀火花。

  2. 堅持認爲數據幀只在內存中(原因是:我需要表現&我的數據集可以適合到內存)

  3. 註冊爲臨時表和運行火花SQL查詢。 #我的火花工作已經結束並且正在運行。

  4. 第二天我將收到增量數據集(在temp_mysql_table或csv文件中)。現在我想在Total set i:e persisted_prevData + recent_read_IncrementalData上運行相同的查詢。我會稱之爲mixedDataset。 ***沒有把握,當增量數據進入系統時,它每天可以達到30次。

  5. 直到這裏我也不想讓火花應用程序被關閉。它應該始終是Up。而且我需要以相同的時間衡量來查詢mixedDataset,就好像它是持久的一樣。

我的顧慮:

  1. 在P4,我是否需要unpersist的prev_data並再次堅持上一頁& Incremantal數據的工會非數據幀?
  2. 而我最重要的擔心是我不想重新啓動Spark-JOB來加載/啓動更新數據(只有當服務器出現故障時,我必須重新啓動)。

因此,在高層次上,我需要動態查詢(更快的性能)數據集+ Incremnatal_data_if_any。

目前我正在通過爲所有數據創建一個文件夾並將增量文件放置在同一目錄中來完成此練習。每2-3個小時,我重新啓動服務器,我的sparkApp開始讀取該系統中存在的所有csv文件。然後查詢在它們上運行。

並試圖探索配置單元persistentTable和Spark Streaming,將在此處更新,如果發現任何結果。

請建議我一種方法/體系結構來實現這一點。

請發表評論,如果有什麼不上問清楚,不downvoting吧:)

感謝。

回答

1

嘗試流相反,它會快得多,因爲會話已經在運行,它會觸發每次你把文件夾中的內容:

df_incremental = spark \ 
    .readStream \ 
    .option("sep", ",") \ 
    .schema(input_schema) \ 
    .csv(input_path) 

df_incremental.where("column1 > 10") \ 
    .writeStream \ 
    .queryName("data_table") \ 
    .format("memory") \ 
    .start() 

spark.sql("SELECT * FROM data_table).show() 
相關問題