0

我在使用scala中的kinesis應用程序運行簡單的vanilla spark流時遇到了一些問題。我在一些教程中遵循了基本指導,如SnowplowWordCountASLSpark Streaming Kinesis集成:初始化Worker中的LeaseCoordinator時出錯

但我仍然不能讓它因爲這室壁運動工作者錯誤的工作:

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator 
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318) 
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174) 
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822) 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118) 
    ... 4 more 

這裏是我的代碼示例:

import com.amazonaws.auth.BasicAWSCredentials 
import com.amazonaws.internal.StaticCredentialsProvider 
import com.amazonaws.regions.RegionUtils 
import com.amazonaws.services.kinesis.AmazonKinesisClient 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kinesis.KinesisUtils 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Milliseconds, StreamingContext} 

/** 
    * Created by franco on 11/11/16. 
    */ 
object TestApp { 
    // === Configurations for Kinesis streams === 
    val awsAccessKeyId = "XXXXXX" 
    val awsSecretKey = "XXXXXXX" 
    val kinesisStreamName = "MyStream" 
    val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com" 
    val appName = "MyAppName" 

    def main(args: Array[String]): Unit = { 

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey) 

    val provider = new StaticCredentialsProvider(credentials) 

    val kinesisClient = new AmazonKinesisClient(provider) 
    kinesisClient.setEndpoint(kinesisEndpointUrl) 

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size() 

    val streams = shards 

    val batchInterval = Milliseconds(2000) 

    val kinesisCheckpointInterval = batchInterval 

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName 

    val cores : Int = Runtime.getRuntime.availableProcessors() 
    println("Available Cores : " + cores.toString) 
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores/2) + "]") 
    val ssc = new StreamingContext(config, batchInterval) 

    // Create the Kinesis DStreams 
    val kinesisStreams = (0 until streams).map { i => 
     KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName, 
     InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2) 
    } 

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print() 
    // Start the streaming context and await termination 
    ssc.start() 
    ssc.awaitTermination() 
    } 


} 

我的IAM策略是這樣的:

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Sid": "Stmt123", 
      "Effect": "Allow", 
      "Action": [ 
       "kinesis:DescribeStream", 
       "kinesis:GetShardIterator", 
       "kinesis:GetRecords" 
      ], 
      "Resource": [ 
       "arn:aws:kinesis:region:account:stream/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt456", 
      "Effect": "Allow", 
      "Action": [ 
       "dynamodb:CreateTable", 
       "dynamodb:DeleteItem", 
       "dynamodb:DescribeTable", 
       "dynamodb:GetItem", 
       "dynamodb:PutItem", 
       "dynamodb:Scan", 
       "dynamodb:UpdateItem" 
      ], 
      "Resource": [ 
       "arn:aws:dynamodb:region:account:table/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt789", 
      "Effect": "Allow", 
      "Action": [ 
       "cloudwatch:PutMetricData" 
      ], 
      "Resource": [ 
       "*" 
      ] 
     } 
    ] 
} 

我不明白這個應用程序有什麼問題。任何關於這個問題的指導將不勝感激。

回答

1

還有其他DStream的構造函數可以讓您傳入AWS訪問密鑰和密鑰。

例如,下面鏈接中的第1個和第5個構造函數將允許您在構造函數中傳遞它們(並且應該通過系統傳遞),而不必設置系統屬性。

KinesisUtil Constructors

1

最終我通過將憑據值設置爲系統屬性來實現它。

System.setProperty("aws.accessKeyId","XXXXXX") 
System.setProperty("aws.secretKey","XXXXXX") 

但是我對這個解決方案並不滿意。

您是否認爲有關於此方法的問題?