2017-03-06 3 views
1

1 개의 JobManager와 2 개의 TaskManagers로 구성된 HA Flink v1.2 클러스터를 자체 VM (YARN 또는 hdfs를 사용하지 않음)에 각각 만들었습니다. JobManager 노드에서 작업을 시작하면 하나의 TaskManager 인스턴스가 종료됩니다. 즉시 웹 대시 보드에서 작업 취소 및 실패를 볼 수 있습니다. 나는 로그를 확인하는 경우 : 작업의 구현에TaskManager가 실패 할 때 예상되는 HA 동작

03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(2/2) switched to SCHEDULED 
03/06/2017 16:23:50 Flat Map(2/2) switched to DEPLOYING 
03/06/2017 16:23:50 Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(1/2) switched to RUNNING 
03/06/2017 16:23:50 Flat Map(2/2) switched to RUNNING 
03/06/2017 16:23:50 Source: Custom Source -> Flat Map(2/2) switched to RUNNING 
03/06/2017 16:25:38 Flat Map(1/2) switched to FAILED 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost. 
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) 
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610) 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
    at java.lang.Thread.run(Thread.java:745) 

03/06/2017 16:25:38 Job execution switched to status FAILING. 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager 'ip-10-106-0-238/10.106.0.238:40578'. This might indicate that the remote task manager was lost. 
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelInactive(PartitionRequestClientHandler.java:118) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) 
    at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:294) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:223) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:829) 
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:610) 
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
    at java.lang.Thread.run(Thread.java:745) 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Flat Map(2/2) switched to CANCELING 
03/06/2017 16:25:38 Source: Custom Source -> Flat Map(1/2) switched to CANCELED 
03/06/2017 16:26:18 Source: Custom Source -> Flat Map(2/2) switched to CANCELED 
03/06/2017 16:26:18 Flat Map(2/2) switched to CANCELED 

을 나는

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, // number 
                   // of 
                   // restart 
                   // attempts 
     Time.of(10, TimeUnit.SECONDS) // delay 
)); 

내 질문은 JobManager가 자동으로 나머지/실행 윈도우 작업 관리자에 대한 모든 요청을 리디렉션 안입니다 있나요? 마찬가지로 JobManager와 1 TaskManager 인스턴스를 시작하고 작업을 실행하면 두 번째 TaskManager 인스턴스를 시작할 때 실행중인 작업을 해결하는 데 기여해야합니까?

감사합니다.

답변

1

우선 RestartStrategy은 HA 모드와 아무 관련이 없습니다. 고 가용성은 JobManager의 가용성과 관련이 있습니다. 어쨌든 HA가 작동하려면 적어도 두 개의 JobManagers 인스턴스가 필요합니다 (단 하나만 시작한다고 말했음).

RestartStrategy의 경우 fixedDelayRestart 전략을 실패한 후에 지정하면 (예 : TaskManager를 종료 할 때와 같이) 작업이 다시 한 번 실행됩니다 (10 초 후). 설치에 해당하지 않는 경우 작업을 실행하는 데 사용할 수있는 리소스가 누락 된 것 같습니다 (TaskManager 당 하나의 작업 슬롯이 있다고 가정합니다. 하나만 남아 있으면 병렬 처리 2 이상의 작업을 실행할 수 없음) .

마지막 질문에 TaskManager을 추가한다고해서 실행중인 작업에 영향을 미치지 않습니다. 어떻게 든 연결된 동작을 동적 확장이라고합니다. 저장 점을 취한 다음 더 많은 자원으로 다시 실행하여이를 수행 할 수 있습니다. 좀 봐 here. 자동 재조정은 진행중인 작업입니다.

+0

안녕하세요, 제게 답을 해 주셔서 감사합니다. 병렬 테스트가 1로 설정되고 각 TaskManager가 1 슬롯으로 설정된 새 테스트를 만들었습니다. 당신은 나머지 작업 관리자에서 작업이 재 시도되었지만 오류가 발생합니다 : java.io.FileNotFoundException :/home/우분투/프로토 타입/flink/flink-checkpoints/6fc6168a1e5a6a27f58f6d57deeacb65/chk-37/31c325f7-2b57-4e6b -bc20-3f6e9390a724 (해당 파일 또는 디렉토리 없음). 체크 포인트를 두 번째 TaskManager에서 사용할 수없는 것 같습니다. 이로 인해 작업이 실패하게됩니다. TaskManager간에 검사 점이 동기화되는지 알고 있습니까? – razvan

+0

체크 포인트가 저장되는 위치는 사용 된 StateBackend에 따라 다릅니다. 자세한 내용은 다음을 참조하십시오. https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/state_backends.html#state-backends –

+0

물론, 분명합니다. 백엔드 상태로 파일 시스템을 사용합니다. 하지만 각 TaskManager (다른 VM에 있음)에 로컬 경로를 설정하고 예를 들어 JobManager에서 상태를 동기화하는 프레임 워크를 기다리고있었습니다. 분명히 그렇지 않습니다. TaskManager가 다운되어 로컬에 검사 점을 저장하면 작업이 실패합니다. 모든 TaskManagers 백엔드 상태 localtion이 동일한 경로를 가리켜 야하는지 알고 있습니까? 거기에 이름 UUID가있는 폴더를 생성하고 충돌이 없는지 확인하려고합니다. – razvan