2017-09-01 112 views
1

所以,基本是:遍歷文件在pySpark目錄自動數據幀和SQL表創建

  • 我在星火2 +
  • 我跑這一切在Jupyter筆記本
  • 我的目標是遍歷目錄中的許多文件,並具有spark(1)創建數據幀和(2)將這些數據幀轉換爲sparkSQL表。基本上,我希望能夠隨時打開筆記本電腦,並擁有一個乾淨的方式來始終加載可用的所有內容。

下面是我進口:

from pyspark.sql.functions import * 
from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

fileDirectory = 'data/' 

下面是實際的代碼:

for fname in os.listdir(fileDirectory): 
    sqlContext.read.format("csv").\ 
      option("header", "true").\ 
      option("inferSchema", "true").\ 
      load(fname) 

    df_app = app_dat_df 
    df_app.createOrReplaceTempView(fname) 

但我發現了以下錯誤消息:

AnalysisException: u'Unable to infer schema for CSV. It must be specified manually.;' 

會似乎沒有發現問題的方式,我傳遞的文件(偉大),但它不會讓我推斷模式。當我手動檢查每個文件時,這從來都不是問題。

有人可以給我一些指示我可以改善他們/讓它運行?

很多,非常感謝!

+0

確定的路徑是正確的?你想訪問本地文件系統並且你的工作目錄是'data /'? 'fname'只是文件的名稱,不是它的完整路徑。如果問題來自一個文件,您應該在您的循環中添加一個打印以查看哪一個是問題 – MaFF

+0

好點。我忘了提到這一點,但是,是的,路徑和所有這些都是正確的。如果我按照文件運行以下代碼,它可以正常工作: 'df_name = sqlContext.read.format(「csv」)。option(「header」,「true」)。option(「inferSchema」,「true 「)\ .load(」 數據/ file_name.csv 「)' 'DF = df_name' 'df.createOrReplaceTempView(」 df_name「)' – Berzerkeley

+1

所以你的工作目錄是不是數據是數據的父目錄。在你的代碼中,你直接訪問'transaction_dat.csv'。試試'fileDirectory + fname'而不是 – MaFF

回答

0

由於inferSchema出現錯誤,因此應手動指定csv數據的模式。

另外@Marie已經提到你需要稍微修改你的加載語法。

from pyspark.sql.types import * 

customSchema = StructType([ 
    StructField("string_col", StringType(), True), 
    StructField("integer_col", IntegerType(), True), 
    StructField("double_col", DoubleType(), True)]) 

fileDirectory = 'data/' 
for fname in os.listdir(fileDirectory): 
    df_app = sqlContext.read.format("csv").\ 
     option("header", "true"). \ 
     schema(customSchema). \ 
     load(fileDirectory + fname) 

希望這會有所幫助!


不要忘了讓我們知道是否能解決你的問題:)

+0

@Berzerkeley如果它解決了您的問題,您應該將答案標記爲正確答案,因爲如果將來遇到類似問題,這肯定會有所幫助。謝謝! – Prem