1

카프카 싱크 커넥터로 qubole/streamx를 사용하여 카프카의 데이터를 소비하고 AWS S3에 저장합니다. AIM에서 사용자를 생성했으며 사용 권한은 AmazonS3FullAccess입니다. 그런 다음 dir이 quickstart-s3.properties에 지정하는 hdfs-site.xml에 키 ID와 키를 설정하십시오.AWS EMR에서 qubole/streamx를 사용할 때 AWS S3 액세스 문제

구성 아래 같은 :

quickstart-s3.properties :

name=s3-sink 
connector.class=com.qubole.streamx.s3.S3SinkConnector 
format.class=com.qubole.streamx.SourceFormat 
tasks.max=1 
topics=test 
flush.size=3 
s3.url=s3://myemrbuckettest/raw_data 
hadoop.conf.dir=/data/java/streamx/resource/hadoop-conf 

HDFS-site.xml 파일 :

<property> 
    <name>fs.s3.awsAccessKeyId</name> 
    <value>BALABALABALA</value> 
    </property> 
    <property> 
    <name>fs.s3.awsSecretAccessKey</name> 
    <value>balabalabala</value> 

그리고이 오류를 얻을 때 사용 connect-standalone ./connect-standalone.properties ./quickstart-s3.properties에 싱크 커넥터를 시작하십시오.

[2017-02-14 18:30:32,943] INFO GHFS version: 1.6.0-hadoop2 (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase:597) 
[2017-02-14 18:30:36,406] INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:85) 
org.apache.kafka.connect.errors.ConnectException: org.apache.hadoop.security.AccessControlException: Permission denied: s3n://myemrbuckettest/raw_data 
     at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:205) 
     at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:77) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:221) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: s3n://myemrbuckettest/raw_data 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 
     at org.apache.hadoop.fs.s3native.$Proxy42.retrieveMetadata(Unknown Source) 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:476) 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:601) 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:594) 
     at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877) 
     at com.qubole.streamx.s3.S3Storage.mkdirs(S3Storage.java:67) 
     at io.confluent.connect.hdfs.DataWriter.createDir(DataWriter.java:372) 
     at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:173) 
     ... 10 more 
Caused by: org.jets3t.service.impl.rest.HttpException 
     at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:519) 
     at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281) 
     at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942) 
     at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148) 
     at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2075) 
     at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1093) 

답변

0

내가 사용하는 영역은 cn-north-1입니다. 아래처럼 hdfs-site.xml에 지역 정보를 지정해야합니다. 그렇지 않으면 기본값으로 s3.amazonaws.cn에 연결됩니다.

<property> 
    <name>fs.s3a.access.key</name> 
    <value>BALABALABALA</value> 
    </property> 
    <property> 
    <name>fs.s3a.secret.key</name> 
    <value>balabalabalabala</value> 
    </property> 
    <property> 
    <name>fs.s3.impl</name> 
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    <!--value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value--> 
    </property> 
    <property> 
    <name>fs.s3a.endpoint</name> 
    <value>s3.cn-north-1.amazonaws.com.cn</value>connect-standalone ./connect-standalone.properties ./quickstart-s3.properties 
    </property>