我正在實現流處理的lambda體系結構系統。Spark Streaming:如何在流上加載管道?
我沒有任何問題星火批量創建具有GridSearch管道:
pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])
paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=4)
pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")
不過,我似乎無法找到如何插上星火流處理管道。我使用的卡夫卡作爲DSTREAM源和我的代碼截至目前如下:
import json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils
從pyspark.streaming進口的StreamingContext
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1})
model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))
CODE MISSING GOES HERE
ssc.start()
ssc.awaitTermination()
,現在我需要找到某種方式做
的根據文檔here(儘管它看起來非常非常過時),似乎您的模型需要實現方法predict才能夠在rdd對象上使用它(並且希望在kafkastream上?)
如何在Streaming上下文中使用管道?重新加載的PipelineModel似乎只實現了transform
這是否意味着在Streaming上下文中使用批處理模型的唯一方法是使用純模型並且不使用管道?