2016-08-11 37 views
1

我想從火花文本文件中創建數據框但它會拋出錯誤,這是我的代碼;Spark從文本創建數據框文件

case class BusinessSchema(business_id: String, name: String, address: String, city: String, postal_code: String, latitude: String, longitude: String, phone_number: String, tax_code: String, 
business_certificate: String, application_date: String, owner_name: String, owner_address: String, owner_city: String, owner_state: String, owner_zip: String) 

val businessDataFrame = sc.textFile(s"$baseDir/businesses_plus.txt").map(x=>x.split("\t")).map{ 
    case Array(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) => BusinessSchema(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip)} 

val businessRecords = businessDataFrame.toDF() 

而當我運行此代碼時發生錯誤;

businessRecords.take(20) 

拋出的錯誤碼;

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 25, localhost): scala.MatchError: [Ljava.lang.String;@6da1c3f1 (of class [Ljava.lang.String;) 

回答

2

MatchError意味着模式匹配失敗 - 沒有一種情況符合某些輸入。在這種情況下,您有一個單個的情況,將split("\t")的結果與恰好有16個元素的數組匹配。

您的數據可能有一些不符合此假設的記錄(具有少於或多於16個製表符分隔的字段),這會導致此異常。

爲了克服這一點 - 無論是collect(f: PartialFunction[T, U]),這需要替換使用的map一個PartialFunction(可自動忽略不匹配任何情況下輸入),這隻會過濾掉所有記錄錯誤:

sc.textFile(s"$baseDir/businesses_plus.txt").map(x=>x.split("\t")).collect { 
    case Array(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) => BusinessSchema(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) 
} 

- 添加趕上錯誤的記錄,並做一些與他們的情況 - 例如,你可以用一個RDD[Either[BusinessSchema, Array[String]]]更換RDD[BusinessSchema]結果反映了這一事實有些記錄未能解析,仍然有可用的錯誤數據 - 用於記錄或其他指示:

val withErrors: RDD[Either[BusinessSchema, Array[String]]] = sc.textFile(s"$baseDir/businesses_plus.txt") 
    .map(x=>x.split("\t")) 
    .map { 
    case Array(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip) => Left(BusinessSchema(business_id, name, address, city, postal_code, latitude, longitude, phone_number, tax_code,business_certificate, application_date, owner_name, owner_address, owner_city, owner_state, owner_zip)) 
    case badArray => Right(badArray) 
    } 

// filter bad records, you can log/count/ignore them 
val badRecords: RDD[Array[String]] = withErrors.collect { case Right(a) => a } 

// filter good records - you can go on as planned from here... 
val goodRecords: RDD[BusinessSchema] = withErrors.collect { case Left(r) => r }