2017-05-28 70 views
1

我想按產品價格對產品進行排序。下面是代碼 -如何刪除pyspark中產品價格爲空值的記錄

products = sc.textFile("/user/cloudera/sqoop_import/products") 

獲取產品類別ID從第2列

productsMap = products.map(lambda rec: (rec.split(",")[1], rec)) 

集團從字符串類型轉換產品價格線按類別ID

productsGroupBy = productsMap.groupByKey() 

按產品價格排序浮動:

for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i) 

我無法爲具有空值的產品價格輸入幾個值。那麼,有沒有辦法刪除這個特定字段的空值的記錄。請找出錯誤日誌如下─

17/05/28 00:48:25 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 6) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.5.0.0-1245/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 2, in File "", line 2, in ValueError: could not convert string to float:

+0

你試過像'productsGroupBy = productsMap.filter(拉姆達行: yourProductPriceColumn!=「Null」)。groupByKey()'在進入* for循環之前*? (否則,此過濾器也可以與* productsGroupBy *一起使用)。由於您已經導入了字符串值,因此使用此過濾功能,您將擁有RDD,而無需productPrice爲「Null」的行,因此您可以將字符串轉換爲float。 – titiro89

+0

非常感謝@ titiro89 !!我運行了下面的代碼並得到了下面的結果:'for i in productsMap.filter(lambda line:line.split(「,」)[4] ==「」).collect():print(i)' 。 .. ** 685,31,TaylorMade SLDR鐵桿 - (鋼)4-PW,AW ,, 899.99,http://images.acmesports.sports/TaylorMade+SLDR+Irons+-+%28Steel%29+4-PW %2C + AW ** –

+0

此處,product_price已移至第5個索引,而不是第4個索引。所以,這樣的記錄應該被跳過! –

回答

1

卡我們可以包含空值的產品篩選,如下

Quoting - isNotNull()

//filter the products containing null 
filteredProducts = products.filter(products.price.isNotNull()) 
//create a tuple (CategoryId,record) 
filteredProductsMap = filteredProducts.map(lambda rec: (rec.split(",")[1], rec)) 
//group by categoryId 
productsGroupBy = filteredProductsMap.groupByKey() 
//Sort based on product price 
for i in productsGroupBy.map(lambda rec: sorted(rec[1], key=lambda k: float(k.split(",")[4]))).collect(): print(i) 
+0

@Divyojyoti辛哈 - 請你接受我會很感激。 – Raja