2017-09-19 8 views
0

사용자 정의 병렬 가능 소켓 소스를 구현 한 작업에서 저장 지점을 사용하려고합니다.Flink 저장 점이 거부되었습니다.

Cancelling job 4c99e0220c8c4683d1287269073b5c2c with savepoint to savepoints/. 


java.lang.Exception: Canceling the job with ID 4c99e0220c8c4683d1287269073b5c2c failed. 
    at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:637) 
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1092) 
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133) 
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130) 
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) 
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) 
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130) 
Caused by: java.lang.Exception: Failed to trigger savepoint. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:639) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:629) 
    at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) 
    at akka.dispatch.OnComplete.internal(Future.scala:247) 
    at akka.dispatch.OnComplete.internal(Future.scala:245) 
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) 
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) 
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) 
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Checkpoint was declined (tasks not ready) 
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortDeclined(PendingCheckpoint.java:510) 
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:735) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply$mcV$sp(JobManager.scala:1491) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply(JobManager.scala:1490) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$2.apply(JobManager.scala:1490) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    ... 6 more 
: 소스는 저장 포인트는 다음과 같은 예외를 거부 내 로컬 컴퓨터에이

flink-1.1.3/bin//flink cancel -s hdfs://flink-master:19000/flink-checkpoints a18499a80099045eb5120ecacdabd421 
Retrieving JobManager. 
Using address flink-master/10.0.0.16:6123 to connect to JobManager. 
Cancelling job a18499a80099045eb5120ecacdabd421 with savepoint to hdfs://flink-master:19000/flink-checkpoints. 

java.lang.Exception: Canceling the job with ID a18499a80099045eb5120ecacdabd421 failed. 
    at org.apache.flink.client.CliFrontend.cancel(CliFrontend.java:637) 
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1092) 
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133) 
    at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130) 
    at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) 
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) 
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130) 
Caused by: java.lang.Exception: Failed to trigger savepoint. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:639) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$6.apply(JobManager.scala:629) 
    at org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272) 
    at akka.dispatch.OnComplete.internal(Future.scala:247) 
    at akka.dispatch.OnComplete.internal(Future.scala:245) 
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175) 
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) 
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) 
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) 
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.io.EOFException: Premature EOF: no length prefix available 
    at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282) 
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347) 
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266) 
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) 
    Suppressed: java.lang.IllegalArgumentException: Self-suppression not permitted 
     at java.lang.Throwable.addSuppressed(Throwable.java:1043) 
     at java.io.FilterOutputStream.close(FilterOutputStream.java:159) 
     at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:207) 
     at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.storeSavepointToHandle(SavepointStore.java:150) 
     at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpointExternalized(PendingCheckpoint.java:281) 
     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:888) 
     at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:813) 
     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply$mcV$sp(JobManager.scala:1462) 
     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) 
     at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$handleCheckpointMessage$1.apply(JobManager.scala:1461) 
     at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
     at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
    [CIRCULAR REFERENCE:java.io.EOFException: Premature EOF: no length prefix available] 

뭔가를 보이는 클러스터에서이

@Override 
public void run(SourceContext<String> sourceContext) throws Exception { 
    int idx = getRuntimeContext().getIndexOfThisSubtask(); 
    String[] hosts = (config.hostsStr).split(":"); 
    String[] portStrArr = (config.portsStr).split(":"); 
    int[] ports = new int[portStrArr.length]; 
    for (int i = 0; i < portStrArr.length; i++) { 
     ports[i] = Integer.parseInt(portStrArr[i]); 
    } 
    Socket s = new Socket(hosts[idx], ports[idx]); 
    BufferedReader in = new BufferedReader(new InputStreamReader(s.getInputStream())); 
    //ois = new ObjectInputStream(s.getInputStream()); 
    while (running) { 
     String str = in.readLine(); 
     sourceContext.collect(str); 

    } 
    sourceContext.close(); 

} 

@Override 
public void cancel() { 
    running = false; 
} 

예외 비슷한 보이는

체크 포인트가 발생하지 않도록 소스를 올바르게 중지 할 수 없기 때문에 그럴 수 있습니까? 클러스터에서는 성공했음을 나타내며 위치를 저장 점으로 되 돌리지 만 해당 경로에는 파일이 없습니다.

답변

0

원본 기능 발췌를 감안할 때 거의 나에게 좋을 것 같습니다. 당신이해야 할 일은 체크 포인트 잠금 아래에 요소를 출력하는 것입니다. 그렇지 않으면 검사 점이 트리거 될 때 요소가 출력 될 때 문제가 발생할 수 있습니다. SourceContext#getCheckpointLock은이 두 작업이 동시에 발생하지 않도록합니다.

첫 번째 오류는 마치 HDFS쪽에 문제가있는 것처럼 보입니다. 그들이 의심스러운 것을 포함하고 있는지 로그를 확인할 수 있습니까? 데이터 노드에 디스크 공간이 부족한 것 같습니다.

두 번째 예외는 검사 점을 수행하는 동안 문제가 발생했음을 나타냅니다. JobManager 로그에는 검사 점이 실패한 이유를 나타내는 log 문이 있어야합니다. 작업의 검사 점 거부로 인해 CHECK_POINT_ID가 무시됩니다. EXECUTION_ID : REASON.