我一直試圖在PySpark上做一個簡單的隨機森林迴歸模型。我在R上有一個很好的機器學習經驗。然而,對我而言,Pyspark上的ML似乎完全不同 - 尤其是當涉及到分類變量,字符串索引和OneHotEncoding的處理時(當只有數字變量時,我才能夠僅通過以下示例執行RF迴歸)。雖然有很多示例可用於處理分類變量,例如this和this,但是我沒有成功,因爲它們中的大多數都超出了我的頭(可能是因爲我對Python ML不熟悉)。我會感謝任何能夠幫助解決這個問題的人。PySpark上的分類輸入隨機森林迴歸
這裏是我的嘗試:inputfile is here
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import Row
from pyspark.sql.functions import col, round
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv')
train.cache()
train.dtypes
輸出是:
DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double]
接下來,我選擇我的興趣變量:
IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"]
train = train.fillna("XXX")
train = train.select([column for column in train.columns if column in IMP])
from pyspark.sql.types import DoubleType
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double"))
train.cache()
輸出是:
DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double]
我的因變量是ConversionPayOut
,之前的字符串類型現在轉換爲double類型。
從這裏開始我的困惑: 根據this post,我明白我必須將我的分類字符串類型變量轉換爲onehot編碼向量。這是我嘗試在那:
首先一個StringIndexing:
`
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ]
pipeline = Pipeline(stages=indexers)
train_catind = pipeline.fit(train).transform(train)
train_catind.show()
`
StringIndexing的輸出:
`
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0|`
Next, I think, I have to do the OneHOtEncoding of the String Indexes:
`
from pyspark.ml.feature import OneHotEncoder, StringIndexer
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ]
pipeline = Pipeline(stages=indexers_ON)
train_OHE = pipeline.fit(train_catind).transform(train_catind)
train_OHE.show()
`
出後一個熱編碼是這樣的:
`
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
|Country|Carrier|TrafficType| Device| Browser| OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec|
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+
| TH| 20.0| A| Lava| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 7.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])|
| BR| 217.0| A| LG| chrome|Android| 26.2680574| 0.0| 0.0| 2.0| 0.0| 0.0| 5.0| (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])|
| TH| 20.0| A|Generic| chrome|Android| 41.6| 0.0| 0.0| 1.0| 0.0| 0.0| 0.0| (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])|
`
我無能,如何繼續前進。實際上,我對Spark Machine Learning包需要我們做這種單熱編碼以及哪些不需要編碼是毫無頭緒的。
如果StackOverflow社區可以闡明如何前進,那麼對PySpark的所有新手來說,這將是非常好的學習。
謝謝你的回答。這與我所嘗試的類似。但運行VectorAssembler後,我遇到了新的錯誤。你能看看這個問題嗎? https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – kasa
@kasa你可以請嘗試這段代碼,讓我們知道如果你是仍然得到相同的錯誤? – Prem