2017-04-11 109 views
2

我想運行一個多項Logistic迴歸模型星火管道錯誤

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName('prepare_data').getOrCreate() 

from pyspark.sql.types import * 
spark.sql("DROP TABLE IF EXISTS customers") 
spark.sql("CREATE TABLE customers (
      Customer_ID DOUBLE, 
      Name STRING, 
      Gender STRING, 
      Address STRING, 
      Nationality DOUBLE, 
      Account_Type STRING, 
      Age DOUBLE, 
      Education STRING, 
      Employment STRING, 
      Salary DOUBLE, 
      Employer_Stability STRING, 
      Customer_Loyalty DOUBLE, 
      Balance DOUBLE, 
      Residential_Status STRING, 
      Service_Level STRING)") 
spark.sql("LOAD DATA LOCAL INPATH '../datasets/dummyTrain.csv' INTO TABLE 
      customers") 

dataset = spark.table("customers") 
cols = dataset.columns 
display(dataset) 

from pyspark.ml import Pipeline 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler 

categoricalColumns = ["Education", "Employment", "Employer_Stability", 
         "Residential_Status"] 
stages = [] 

for categoricalCol in categoricalColumns: 
    stringIndexer = StringIndexer(inputCol=categoricalCol, 
     outputCol=categoricalCol+"Index") 
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", 
     outputCol=categoricalCol+"classVec") 
    stages += [stringIndexer, encoder] 

label_stringIdx = StringIndexer(inputCol = "Service_Level", outputCol = 
    "label") 
stages += [label_stringIdx] 

numericCols = ["Age", "Salary", "Customer_Loyalty", "Balance"] 
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + 
    numericCols 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") 
stages += [assembler] 

pipeline = Pipeline(stages=stages) 
pipelineModel = pipeline.fit(dataset) 
dataset = pipelineModel.transform(dataset) 
selectedcols = ["label", "features"] + cols 
dataset = dataset.select(selectedcols) 
display(dataset) 

我收到以下錯誤:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-31-07d2fb5cecc8> in <module>() 
     4 # - fit() computes feature statistics as needed 
     5 # - transform() actually transforms the features 
----> 6 pipelineModel = pipeline.fit(dataset) 
     7 dataset = pipelineModel.transform(dataset) 
     8 

/srv/spark/python/pyspark/ml/base.py in fit(self, dataset, params) 
    62     return self.copy(params)._fit(dataset) 
    63    else: 
---> 64     return self._fit(dataset) 
    65   else: 
    66    raise ValueError("Params must be either a param map or a 
list/tuple of param maps, " 

/srv/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset) 
    109      transformers.append(model) 
    110      if i < indexOfLastEstimator: 
--> 111       dataset = model.transform(dataset) 
    112    else: 
    113     transformers.append(stage) 

/srv/spark/python/pyspark/ml/base.py in transform(self, dataset, params) 
    103     return self.copy(params)._transform(dataset) 
    104    else: 
--> 105     return self._transform(dataset) 
    106   else: 
    107    raise ValueError("Params must be a param map but got 
%s." % type(params)) 

/srv/spark/python/pyspark/ml/wrapper.py in _transform(self, dataset) 
    250  def _transform(self, dataset): 
    251   self._transfer_params_to_java() 
--> 252   return DataFrame(self._java_obj.transform(dataset._jdf), 
dataset.sql_ctx) 
    253 
    254 

/srv/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/srv/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/srv/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling o798.transform. 
: java.lang.NullPointerException at 

我未能找出我做錯了,似乎該問題可能在transform()方法上。任何幫助,將不勝感激。

+1

你確定你沒有缺失值嗎?它返回一個NullPointerException,它表明你的一個變換器不能正確處理數據。 – TDrabas

+0

@TDrabas數據集在所有行上返回NULL。謝謝。現在我改變了我的代碼,創建了一個臨時視圖,並從中選擇了我的列。當我想運行管道並創建轉換時,我現在正在得到一個'IllegalArgumentException:'數據類型StringType不受支持。''錯誤 – maffsojah

+1

您可以粘貼調試信息嗎?當然''VectorAssembler'需要數字列,這絕對看起來像你的一個輸入要素是一個字符串。檢查剛剛由'printSchema()'創建的視圖的模式,以查看數字列是否不會以某種方式被轉換爲字符串。順便說一句,當你創建'encoder'時,你可以考慮將inputCol指定爲'StringIndexer.getOuputCol()' – TDrabas

回答

2

您需要確保數據中沒有缺失值 - 這就是爲什麼你得到NullPointerException。另外,請確保VectorAssembler的所有輸入功能都是數字。

順便說一句,當你創建編碼器時,你可以考慮指定inputColStringIndexer.getOuputCol()