2017-07-19 51 views
0
val sc = new SparkContext(conf) 

val streamContext = new StreamingContext(sc, Seconds(1)) 

val log = Logger.getLogger("sqsLog") 
val sqs = streamContext.receiverStream(new SQSReceiver("queue") 
    .at(Regions.US_EAST_1) 
    .withTimeout(5)) 


val jsonRows = sqs.mapPartitions(partitions => { 
    val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY"))) 

    val txfm = new LogLine2Json 
    val log = Logger.getLogger("parseLog") 
    val sqlSession = SparkSession 
    .builder() 
    .getOrCreate() 

    val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/") 
    val parsedDate = parsedFormat.format(new java.util.Date()) 
    val outputPath = "/tmp/spark/presto" 

    partitions.map(messages => { 
    val sqsMsg = Json.parse(messages) 
    System.out.println(sqsMsg) 

    val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") 
    val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "") 
    System.out.println(bucketName) 
    System.out.println(key) 
    val obj = s3Client.getObject(new GetObjectRequest(bucketName, key)) 
    val stream = obj.getObjectContent() 
    scala.io.Source.fromInputStream(stream).getLines().map(line => { 
     try{ 
      val str = txfm.parseLine(line) 
      val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str) 
      jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath) 
     } 
     catch { 
      case e: Throwable => {log.info(line); "";} 
     } 
     }).filter(line => line != "{}") 
    }) 
}) 

streamContext.start() 
streamContext.awaitTermination() 

我的工作非常簡單,我們從SQS獲取一個S3密鑰。該文件的內容是nginx日誌,我們使用我們的解析器解析這是工作文件。 LogLine2Json它將日誌轉換爲JSON格式,然後我們將其寫入orc格式。Spark沒有輸出操作註冊,所以沒什麼可執行的,但我正在寫入文件

但我發現了這個錯誤

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at SparrowOrc$.main(sparrowOrc.scala:159) 
    at SparrowOrc.main(sparrowOrc.scala) 

據我所知,星火需要一個動作,否則將無法正常工作。但是我有這個代碼寫入一個orc文件。我不確定我是否需要做其他事情?

jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath) 
+0

在初始數據幀中應該有一連串的調用導致write或writeStream調用。你確實寫了一個數據框,但是因爲這是一個副作用,所以沒有理由開始處理初始數據框。經典I/O很難使用,因爲它在火花建立的處理鏈之外工作。 Spark的hadoop操作與S3兼容,我相信:https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html –

回答

0

首先map不是一個動作。這是一個轉變。 Spark沒有理由執行此代碼。

接下來,您應該避免轉換中的副作用,如果需要輸出的正確性,則不應使用這些副作用。

最後使用標準io在分佈式系統中的功能通常沒有意義。

總的來說,你應該檢討現行的選項DStream片,如果沒有這些是適合您的方案,使用動作(foreachforeachPartition)寫自己的。

+0

你能解釋一下爲什麼使用io函數沒有意義嗎? – toy

相關問題