2017-10-01 23 views
-1

我有很好的格式化文本文件,像波紋管。我們可以在不創建模式的情況下在火花數據框中加載分隔文本文件嗎?

TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!| 
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!| 
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!| 
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!| 
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!| 
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!| 
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!| 
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!| 
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!| 
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!| 
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!| 
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!| 
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!| 
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!| 
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!| 

現在我必須將這個文本文件加載到火花數據框中。

我能做到這樣

val schema = StructType(Array(

     StructField("OrgId", StringType), 
     StructField("LineItemId", StringType), 
     StructField("SegmentId", StringType), 
     StructField("SequenceId", StringType), 
     StructField("Action", StringType))) 

val textRdd1 = sc.textFile("s3://trfsdisu/SPARK/Text1.txt") 
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1))) 
var df1 = sqlContext.createDataFrame(rowRdd1, schema).drop("index") 

但這種方式我已創建的模式,因此,例如,如果我有一個有100列,我必須寫這100次的文本文件。

所以我需要像加載文件像csv。

val df1 = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").load("s3://sdi/SPARK/FinancialLineItem/MAIN") 

但是這不適合我,因爲我有沒有在csv格式的文本文件。

那麼有沒有什麼辦法可以在火花數據框中以csv風格加載文本文件?

val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer") 
     .select($"LineItem_organizationId", $"LineItem_lineItemId",$"DataPartiotion", 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"), 
     when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").alias("StatementtypeCode"), 
     when($"LineItemName_1".isNotNull, $"LineItemName_1").otherwise($"LineItemName").as("LineItemName"), 
     when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"), 
     when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"), 
     when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"), 
     when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"), 
     when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"), 
     when($"LineItemSequence_1".isNotNull, $"LineItemSequence_1").otherwise($"LineItemSequence").as("LineItemSequence"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"), 
     when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed".cast(DataTypes.StringType)).as("IsRangeAllowed"), 
     when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin".cast(DataTypes.StringType)).as("IsSegmentedByOrigin"), 
     when($"SegmentGroupDescription".isNotNull, $"SegmentGroupDescription").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"), 
     when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"), 
     when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"), 
     when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"), 
     when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"), 
     when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"), 
     when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"), 
     when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"), 
     when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"), 
     when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"), 
     when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit".cast(DataTypes.StringType)).as("IsCredit"), 
     when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"), 
     when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"), 
     when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"), 
     when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 

回答

2

在試圖解決你的問題,我面臨的第一個問題是,隨着spark-csv,你只能使用一個字符分隔符,而不是一個字符串分隔符。

我發現該溶液是有點棘手:使用|作爲分隔符

負載從CSV數據。

import org.apache.spark.sql.SQLContext 
val sqlContext = new SQLContext(sc); 

val df = sqlContext.read.format("csv") 
      .option("header", "true") 
      .option("delimiter", "|") 
      .option("inferSchema",true") 
      .load("/home/robin/Bureau/Spark/csv_strange_delimiter.csv") 

這將創建一個數據幀尋找這樣的:

df.show 
+----------+---+----------+---+---------+---+----------+---+------+ 
|  OrgId| ^1|LineItemId| ^3|SegmentId| ^5|SequenceId| ^7|Action| 
+----------+---+----------+---+---------+---+----------+---+------+ 
|4295877341| ^|  136| ^|  4| ^|   1| ^|  I| 
|4295877346| ^|  136| ^|  4| ^|   1| ^|  I| 
|4295877341| ^|  138| ^|  2| ^|   1| ^|  I| 
|4295877341| ^|  141| ^|  4| ^|   1| ^|  I| 
|4295877341| ^|  143| ^|  2| ^|   1| ^|  I| 
|4295877341| ^|  145| ^|  14| ^|   1| ^|  I| 
| 123456789| ^|  145| ^|  14| ^|   1| ^|  I| 
+----------+---+----------+---+---------+---+----------+---+------+ 

2:刪除包含列 「^」

val column_to_keep = df.columns.filter(v => (!v.contains("^"))).toSeq 
val result = df.select(column_to_keep.head, column_to_keep.tail: _*) 

result.show 
+----------+----------+---------+----------+------+ 
|  OrgId|LineItemId|SegmentId|SequenceId|Action| 
+----------+----------+---------+----------+------+ 
|4295877341|  136|  4|   1| I|!|| 
|4295877346|  136|  4|   1| I|!|| 
|4295877341|  138|  2|   1| I|!|| 
|4295877341|  141|  4|   1| I|!|| 
|4295877341|  143|  2|   1| I|!|| 
|4295877341|  145|  14|   1| I|!|| 
| 123456789|  145|  14|   1| I|!|| 
+----------+----------+---------+----------+------+ 
+0

也可以請你告訴我,我怎樣才能添加|! |在我已更新我的代碼 – SUDARSHAN

+1

的所有記錄的操作列中您可以使用此處所述的並置功能:https://stackoverflow.com/a/31452109/6138873 – jeanr

+0

讓我們[在聊天中繼續討論](http:// chat.stackoverflow.com/rooms/156010/discussion-between-sudarshan-and-jeanr)。 – SUDARSHAN

相關問題