2017-09-22 168 views
3

我一直試圖在PySpark上做一個簡單的隨機森林迴歸模型。我在R上有一個很好的機器學習經驗。然而,對我而言,Pyspark上的ML似乎完全不同 - 尤其是當涉及到分類變量,字符串索引和OneHotEncoding的處理時(當只有數字變量時,我才能夠僅通過以下示例執行RF迴歸)。雖然有很多示例可用於處理分類變量,例如thisthis,但是我沒有成功,因爲它們中的大多數都超出了我的頭(可能是因爲我對Python ML不熟悉)。我會感謝任何能夠幫助解決這個問題的人。PySpark上的分類輸入隨機森林迴歸

這裏是我的嘗試:inputfile is here

from pyspark.mllib.linalg import Vectors 
from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer, VectorIndexer 
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 
from pyspark.sql.types import Row 
from pyspark.sql.functions import col, round 
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true',inferschema = "true").load('filename.csv') 
train.cache() 
train.dtypes 

輸出是:

DataFrame[ID: int, Country: string, Carrier: double, TrafficType: string, ClickDate: timestamp, Device: string, Browser: string, OS: string, RefererUrl: string, UserIp: string, ConversionStatus: string, ConversionDate: string, ConversionPayOut: string, publisherId: string, subPublisherId: string, advertiserCampaignId: double, Fraud: double] 

接下來,我選擇我的興趣變量:

IMP = ["Country","Carrier","TrafficType","Device","Browser","OS","Fraud","ConversionPayOut"] 
train = train.fillna("XXX") 
train = train.select([column for column in train.columns if column in IMP]) 
from pyspark.sql.types import DoubleType 
train = train.withColumn("ConversionPayOut", train["ConversionPayOut"].cast("double")) 
train.cache() 

輸出是:

DataFrame[Country: string, Carrier: double, TrafficType: string, Device: string, Browser: string, OS: string, ConversionPayOut: double, Fraud: double] 

我的因變量是ConversionPayOut,之前的字符串類型現在轉換爲double類型。

從這裏開始我的困惑: 根據this post,我明白我必須將我的分類字符串類型變量轉換爲onehot編碼向量。這是我嘗試在那:

首先一個StringIndexing:

`

from pyspark.ml import Pipeline 
from pyspark.ml.feature import StringIndexer 
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(junk) for column in list(set(junk.columns)-set(['Carrier','ConversionPayOut','Fraud'])) ] 
pipeline = Pipeline(stages=indexers) 
train_catind = pipeline.fit(train).transform(train) 
train_catind.show() 

`

StringIndexing的輸出:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+ 
|Country|Carrier|TrafficType| Device|  Browser|  OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index| 
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+ 
|  TH| 20.0|   A| Lava|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   7.0| 
|  BR| 217.0|   A|  LG|  chrome|Android|  26.2680574| 0.0|    0.0|   2.0|   0.0|  0.0|   5.0| 
|  TH| 20.0|   A|Generic|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   0.0|` 


Next, I think, I have to do the OneHOtEncoding of the String Indexes: 

`

from pyspark.ml.feature import OneHotEncoder, StringIndexer 
indexers_ON = [OneHotEncoder(inputCol=column, outputCol=column+"_Vec") for column in filter(lambda x: x.endswith('_index'), train_catind.columns) ] 
pipeline = Pipeline(stages=indexers_ON) 
train_OHE = pipeline.fit(train_catind).transform(train_catind) 
train_OHE.show() 

`

出後一個熱編碼是這樣的:

`

+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+ 
|Country|Carrier|TrafficType| Device|  Browser|  OS| ConversionPayOut|Fraud|TrafficType_index|Country_index|Browser_index|OS_index|Device_index|TrafficType_index_Vec|Country_index_Vec|Browser_index_Vec| OS_index_Vec|Device_index_Vec| 
+-------+-------+-----------+-------+--------------+-------+------------------+-----+-----------------+-------------+-------------+--------+------------+---------------------+-----------------+-----------------+-------------+----------------+ 
|  TH| 20.0|   A| Lava|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   7.0|  (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[7],[1.0])| 
|  BR| 217.0|   A|  LG|  chrome|Android|  26.2680574| 0.0|    0.0|   2.0|   0.0|  0.0|   5.0|  (1,[0],[1.0])| (9,[2],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[5],[1.0])| 
|  TH| 20.0|   A|Generic|  chrome|Android|    41.6| 0.0|    0.0|   1.0|   0.0|  0.0|   0.0|  (1,[0],[1.0])| (9,[1],[1.0])| (5,[0],[1.0])|(1,[0],[1.0])| (15,[0],[1.0])| 

`

我無能,如何繼續前進。實際上,我對Spark Machine Learning包需要我們做這種單熱編碼以及哪些不需要編碼是毫無頭緒的。

如果StackOverflow社區可以闡明如何前進,那麼對PySpark的所有新手來說,這將是非常好的學習。

回答

1

要對預處理數據運行隨機森林,您可以繼續使用下面的代碼。

from pyspark.ml.feature import VectorAssembler 
from pyspark.ml.classification import RandomForestClassifier 

#use VectorAssembler to combine all the feature columns into a single vector column 
assemblerInputs = ["Carrier","Fraud","Country_index_Vec","TrafficType_index_Vec","Device_index_Vec","Browser_index_Vec","OS_index_Vec"] 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") 
pipeline = Pipeline(stages=assembler) 
df = pipeline.fit(train_OHE).transform(train_OHE) 
df = df.withColumn("label", train_OHE.ConversionPayOut) 

#randomly split data into training and test dataset 
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed = 111) 

# train RandomForest model 
rf = RandomForestClassifier(labelCol="label", featuresCol="features") 
rf_model = rf.fit(train_data) 

# Make predictions on test data 
predictions = rf_model.transform(test_data) 


希望這有助於!

+0

謝謝你的回答。這與我所嘗試的類似。但運行VectorAssembler後,我遇到了新的錯誤。你能看看這個問題嗎? https://stackoverflow.com/questions/46377686/how-to-match-and-replace-in-pyspark-when-columns-contain-vectors – kasa

+0

@kasa你可以請嘗試這段代碼,讓我們知道如果你是仍然得到相同的錯誤? – Prem