2016-09-29 22 views
0

我正在實現流處理的lambda體系結構系統。Spark Streaming:如何在流上加載管道?

我沒有任何問題星火批量創建具有GridSearch管道:

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor]) 

paramGrid = (
ParamGridBuilder() 
.addGrid(logistic_regressor.regParam, (0.01, 0.1)) 
.addGrid(logistic_regressor.tol, (1e-5, 1e-6)) 
...etcetera 
).build() 

cv = CrossValidator(estimator=pipeline, 
       estimatorParamMaps=paramGrid, 
       evaluator=BinaryClassificationEvaluator(), 
       numFolds=4) 

pipeline_cv = cv.fit(raw_train_df) 
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df) 
model_fitted.write().overwrite().save("pipeline") 

不過,我似乎無法找到如何插上星火流處理管道。我使用的卡夫卡作爲DSTREAM源和我的代碼截至目前如下:

import json 
from pyspark.ml import PipelineModel 
from pyspark.streaming.kafka import KafkaUtils 

從pyspark.streaming進口的StreamingContext

ssc = StreamingContext(sc, 1) 
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1}) 

model = PipelineModel.load('pipeline/') 
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1])) 

CODE MISSING GOES HERE  

ssc.start() 
ssc.awaitTermination() 

,現在我需要找到某種方式做

根據文檔here(儘管它看起來非常非常過時),似乎您的模型需要實現方法predict才能夠在rdd對象上使用它(並且希望在kafkastream上?)

如何在Streaming上下文中使用管道?重新加載的PipelineModel似乎只實現了transform

這是否意味着在Streaming上下文中使用批處理模型的唯一方法是使用純模型並且不使用管道?

回答

1

我發現了一種將Spark管道加載到Spark流中的方法。

該解決方案適用於Spark 2.0,因爲其他版本可能會實現更好的解決方案。

我找到的解決方案使用toDF()方法將流rdds轉換爲Dataframes,然後在該方法中應用pipeline.transform方法。

雖然這種做事方式效率非常低下。

# we load the required libraries 
from pyspark.sql.types import (
     StructType, StringType, StructField, LongType 
     ) 
from pyspark.sql import Row 
from pyspark.streaming.kafka import KafkaUtils 

#we specify the dataframes schema, so spark does not have to do reflections on the data. 

pipeline_schema = StructType(
    [ 
     StructField("field1",StringType(),True), 
     StructField("field2",StringType(),True), 
     StructField("field3", LongType(),True) 
] 
) 

#We load the pipeline saved with spark batch 
pipeline = PipelineModel.load('/pipeline') 

#Setup usual spark context, and spark Streaming Context 
sc = spark.sparkContext 
ssc = StreamingContext(sc, 1) 

#On my case I use kafka directKafkaStream as the DStream source 
directKafkaStream = KafkaUtils.createDirectStream(ssc, [QUEUE_NAME], {"metadata.broker.list": "localhost:9092"}) 

def handler(req_rdd): 
    def process_point(p): 
     #here goes the logic to do after applying the pipeline 
     print(p) 
    if req_rdd.count() > 0: 
     #Here is the gist of it, we turn the rdd into a Row, then into a df with the specified schema) 
     req_df = req_rdd.map(lambda r: Row(**r)).toDF(schema=pipeline_schema) 
     #Now we can apply the transform, yaaay 
     pred = pipeline.transform(req_df) 
     records = pred.rdd.map(lambda p: process_point(p)).collect() 

希望這會有所幫助。