0

kinesis 응용 프로그램을 사용하여 간단한 바닐라 스파크 스트리밍을 실행할 때 몇 가지 문제가있었습니다. 일부 지침서에서는 기본 안내를 SnowplowWordCountASL으로 따랐습니다.Spark Streaming Kinesis 통합 : LeaseCoordinator를 초기화하는 중 오류가 발생했습니다.

그러나 나는 아직도이 때문에 운동성 작업자 오류로 작동 할 수 없습니다 : 여기

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator 
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374) 
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318) 
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174) 
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758) 
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822) 
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118) 
    ... 4 more 

내 코드 예제입니다 :

import com.amazonaws.auth.BasicAWSCredentials 
import com.amazonaws.internal.StaticCredentialsProvider 
import com.amazonaws.regions.RegionUtils 
import com.amazonaws.services.kinesis.AmazonKinesisClient 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kinesis.KinesisUtils 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Milliseconds, StreamingContext} 

/** 
    * Created by franco on 11/11/16. 
    */ 
object TestApp { 
    // === Configurations for Kinesis streams === 
    val awsAccessKeyId = "XXXXXX" 
    val awsSecretKey = "XXXXXXX" 
    val kinesisStreamName = "MyStream" 
    val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com" 
    val appName = "MyAppName" 

    def main(args: Array[String]): Unit = { 

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey) 

    val provider = new StaticCredentialsProvider(credentials) 

    val kinesisClient = new AmazonKinesisClient(provider) 
    kinesisClient.setEndpoint(kinesisEndpointUrl) 

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size() 

    val streams = shards 

    val batchInterval = Milliseconds(2000) 

    val kinesisCheckpointInterval = batchInterval 

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName 

    val cores : Int = Runtime.getRuntime.availableProcessors() 
    println("Available Cores : " + cores.toString) 
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores/2) + "]") 
    val ssc = new StreamingContext(config, batchInterval) 

    // Create the Kinesis DStreams 
    val kinesisStreams = (0 until streams).map { i => 
     KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName, 
     InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2) 
    } 

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print() 
    // Start the streaming context and await termination 
    ssc.start() 
    ssc.awaitTermination() 
    } 


} 

그리고 내 IAM 정책은 다음과 같습니다

{ 
    "Version": "2012-10-17", 
    "Statement": [ 
     { 
      "Sid": "Stmt123", 
      "Effect": "Allow", 
      "Action": [ 
       "kinesis:DescribeStream", 
       "kinesis:GetShardIterator", 
       "kinesis:GetRecords" 
      ], 
      "Resource": [ 
       "arn:aws:kinesis:region:account:stream/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt456", 
      "Effect": "Allow", 
      "Action": [ 
       "dynamodb:CreateTable", 
       "dynamodb:DeleteItem", 
       "dynamodb:DescribeTable", 
       "dynamodb:GetItem", 
       "dynamodb:PutItem", 
       "dynamodb:Scan", 
       "dynamodb:UpdateItem" 
      ], 
      "Resource": [ 
       "arn:aws:dynamodb:region:account:table/name" 
      ] 
     }, 
     { 
      "Sid": "Stmt789", 
      "Effect": "Allow", 
      "Action": [ 
       "cloudwatch:PutMetricData" 
      ], 
      "Resource": [ 
       "*" 
      ] 
     } 
    ] 
} 

이 앱에 어떤 문제가 있는지 알 수 없습니다. 이 주제에 대한 모든 안내가 감사하겠습니다.

답변

1

AWS 액세스 키와 비밀 키를 전달할 수있는 DStream의 다른 생성자가 있습니다.

예를 들어, 아래 링크의 첫 번째 및 다섯 번째 생성자는 시스템 속성을 설정해야하는 경우와 비교할 때 생성자에서 전달할 수 있습니다 (시스템을 통과해야 함).

1

결국 시스템 속성에 자격 증명 값을 설정하여 작동하게 만듭니다.

System.setProperty("aws.accessKeyId","XXXXXX") 
System.setProperty("aws.secretKey","XXXXXX") 

그러나 나는이 솔루션에 대해 "만족스럽지 않습니다".

이 방법과 관련하여 문제가 있다고 생각하십니까?