2011-08-03 2 views
2

종료 시점을 포함하여 많은 시간 동안 외부 프레임 워크에서 들어오는 비동기 호출이 넘쳐나는 것을 처리해야하는 Java 응용 프로그램을 개발 중입니다. 정상적으로 작동하는 동안 이러한 수신 호출은 다시 비동기 적으로 다른 프레임 워크로 전달되어야합니다.비동기 호출에 대한 정상 종료 확인 잠금을 구현하거나 예외를 처리하려면?

지금은 모듈이 "양호한"시민으로 설정되어 있고 종료 플래그를 중심으로 잠그는 기능이 있습니다.이 플래그는 일단 설정되면 정상적인 발신 통화의 발송을 중단합니다.

들어오는 호출과 나가는 호출 모두 비동기식이기 때문에 동일한 종료 플래그 검사 (아래 참조)를 수행하는 각 "작업자"작업을 수행해야합니다. EDIT : 세마포어를 사용하는 것은 하나의 획득/릴리스가 각 작업자에게 필요하다는 것을 다른 질문으로 저에게 지적되었습니다.) 작동하지만 이러한 작업자 작업이 많아 성능이 누적 될 수 있다는 점에 대해 걱정합니다. 프레임 워크가 약간 확장되면 프로파일 링이 곧 제공되지만 결과에 관계없이 모범 사례를 따르는 것이 좋습니다.

대체 방법으로는 단순히 종료 플래그 확인 잠금을 수행하지 않고 비동기 호출이 처리를 완료하기 전에 외부 프레임 워크가 종료 될 때 생성되는 예상 예외를 처리하는 것입니다. 이 접근법이 취해진다면 해로운 운영상의 효과는 없다고 덧붙여 야합니다. 두 가지 방법 모두 정상적인 종료가됩니다.

귀하의 아이디어가 더 나은 방법입니까? 예외가없는 무거운 손으로 고정하는 것, 잠금을 해제하는 것, 예외를 포격하는 것. 자물쇠와

은 작업자의 작업 코드는 다음과 같은 :

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock(); 
private boolean isShutdown; 

private void workerTask() 
{ 
    try 
    { 
     shutdownLock.readLock().lock(); 

     if (isShutdown) 
     return; 

     executeAsynchronously(new Runnable() 
     { 
     @Override 
     final public void run() 
     { 
      try 
      { 
       shutdownLock.readLock().lock(); 

       if (isShutdown) 
        return; 

       // Do something here. 
      } 
      finally 
      { 
       shutdownLock.readLock().unlock(); 
      } 
     } 
     }); 
    } 
    finally 
    { 
     shutdownLock.readLock().unlock(); 
    } 
} 

셧다운 방법은 shutdownLock.writeLock(), 다음 isShutdown 플래그를 설정을 요청합니다.

volatile private boolean isShutdown; 

private void workerTask() 
{ 
    try 
    { 
     executeAsynchronously(new Runnable() 
     { 
     @Override 
     final public void run() 
     { 
      try 
      { 
       // Do something here. 
      } 
      catch (final FrameworkRuntimeException exception) 
      { 
       if ((! isShutdown) || (exception.type != 
             FrameworkRuntimeException.FrameworkIsDisposed)) 
        throw exception; 
      } 
     } 
     }); 
    } 
    catch (final FrameworkRuntimeException exception) 
    { 
     if ((! isShutdown) || (exception.type != 
          FrameworkRuntimeException.FrameworkIsDisposed)) 
     throw exception; 
    } 
} 

이 구현의 종료 방법이 참으로 휘발성 isShutdown 플래그를 설정합니다

잠금 및 종료 생성 된 예외를 기대하지 않고 다른 방법은 다음과 같이 보입니다. 어떤 의견에 미리

감사합니다,

러스

편집 : 그것은 유용하게 내가 첫 번째 방법에 이중 잠금을 방지하기 위해 세마포어를 사용할 수있는 또 다른 질문에 나에게 지적되어있어, 결국 무거운 짐이되지는 않을 것이지만 문제는 여전히 남아 있습니다.

+1

이 필드에는 많은 경험이 없지만 첫 번째 해결 방법 (잠금 장치)은 약간 과장된 것으로 보입니다. 또한 "무언가"가 무엇인지는 말하지 않지만 안전을 위해 하나 또는 두 개의 if (isShutDown) return 만 넣을 수 있습니다. – toto2

+0

맞습니다. 과도한 것입니다. 이중 잠금 대신에 세마포어 허용을 사용할 수 있습니다. 이 방법을 사용하면 바깥 쪽 바디의 종료 플래그에 대해 한 번만 확인하면됩니다. 댓글 주셔서 감사합니다. – Russ

+0

내가 잘 약간의 오버 헤드 및 종료 플래그와 함께이 문제를 해결하는 방법에 대해 생각하고 (흥미로운 부분은 하나가 다른 스레드가 추가로 메시지를 보낼 수 없음을 확신 할 수있는 종료는 즉시 시작할 수 있다는 것입니다)하지만, 정말 예외 경우 접근 방식이 실제로 잘 작동합니다. 간단하고 코드가 적으며 오버 헤드가 없습니다. – Voo

답변

0

그래, 여기이게 작동해야하고 너무 많은 런타임 오버 헤드를 방해하지 않아야한다고 생각합니다. (여기에 아침에 4.30이 나오므로 더 나은 double check가 있습니다.)). 또한 좋은 try {} finally {} 코드 블록을 던지는 것은 꽤 좋은 아이디어이지만 명확성을 위해 생략되었음을 주목하십시오.

public static final AtomicInteger activeConnections = new AtomicInteger(); 
public static volatile boolean shutdown = false; 

public static void shutdown() { 
    shutdown = true; 
    while (activeConnections.get() > 0) { 
     synchronized(activeConnections) { 
      try { 
       activeConnections.wait(); 
      } 
      catch(InterruptedException e) { 
      } 
     } 
    } 
    // proceed shutdown 
} 

public static void run() { 
    if (shutdown) return; 
    activeConnections.incrementAndGet(); 
    if (shutdown) { 
     leave(); 
     return; 
    } 
    // do stuff 
    leave(); 
} 

private static void leave() { 
    int outstandingConnections = activeConnections.decrementAndGet(); 
    if (shutdown && outstandingConnections == 0) { 
     synchronized(activeConnections) { 
      activeConnections.notifyAll(); 
     } 
    }  
} 

종료 플래그가 설정되면 바로 새 스레드가 작동하지 않습니다. 모든 스레드는 외부 프레임 워크와 통신하는 동안 정수를 증가시키고 완료되면 감소시킵니다. 셧다운은 스레드가 더 이상 통신하지 않는 한 바로 진행될 수 있습니다. 셧다운 플래그가 처음 설정되었으므로 새로운 스레드는 더 이상 시작하지 않습니다.

당신은 꽤 가벼운 AtomicInteger과 휘발성 메모리 장벽 (즉, CAS 루프로 구현이야, 당신은 훨씬 낮은 오버 헤드를 얻을 수 없다) 얻을 그런 식으로. 이제

나는 아직도 내 첫 코멘트에 서서는 예외를 잡기 위해 간단하게,보다 효율적이고 짧은 있다고 말하지만, 나는 문제 : 일반적으로

+0

이 문제를 해결하는 데 시간을내어 주셔서 감사합니다. (!) : - 종료()의 두번째 라인에서 스레드 A, activeConnections.get()가 반환 1. - 스레드 난 당신이 대답하여 오전 4시 반으로 예정하고 있지만 일부 필요한까지 강화가있을 수 있습니다 경우처럼 B는 마지막 작업이므로 notifyAll()을 포함하여 leave()를 모두 실행합니다. - 이제 스레드 A가 동기화 블록에 도달하여 영원히 기다립니다. – Russ

+0

이전 질문에 답한 사람은 휘발성 물질 및 원자력을 제외하고 wait & notifyAll을 수행하는 데 좋은 패턴이 있습니다. http://stackoverflow.com/questions/6921530/java-read-write-lock -requirement-with-lock-and-release-from-different-threads/6921738 # 6921738. 분들께 예외가 더 효율적일 수 있습니다 일,하지만 난 그것을 내가 세마포어를 사용하여 근로자 및 종료 스레드 사이에 신호를 보냅니다 그런 식으로 일을 결국하지 않는 경우 (위와 같은 페이지에서 예 참조). – Russ

+0

@Russ 네가 맞다. 나는 그 중 하나를 해결할 수 있다고 생각하지만, 세마포어 솔루션은 정말 멋지다 - 모든 허가를 얻는 시스템 종료를 가진 "제한없는"세마포어 (심지어 한 번에 1 개 이상의 허가를 얻을 수 있다는 것을 알지도 못했다). 훌륭한 솔루션! 실행자 서비스에 대해 – Voo

1

난 당신이 체크 접근 방식을 선호하는 것을 좋아하고있어 종료하려면 작업을 실행하십시오. 당신이 낙관적으로 그리고 나서 당신이 알고있는 예외를 버리면 종료로 인해 그때 당신은 오류를 잘못 분류하고 실제 문제를 놓치는 위험을 감수해야합니다.

지금까지의 코드를 단순화하는가는대로, 당신은 모든 잠금을 제거 할 수 및 당신의 executeAsynchronously 방법은 ExecutorService 사용하는지 확인 - 다음 shutdown 방법은 단지 서비스에 shutdown를 호출 작업 생성이 경우 건너 뛸 수 있습니다 isShutdown이 true를 반환하고 반환하기 전에 작업이 완료 될 때까지 기다릴 필요가있는 경우 유용한 awaitTermination 메서드를 사용할 수 있습니다.

+0

+1. 난 그냥 종료시 Scheduled' 방법 @'봄과이 문제로 달리고, 나는에 있었다 [구현'만약 ApplicationListener '] (http://stackoverflow.com/a/6603443/433348)에 대한 후크를 만들려면 TaskScheduler를 종료합니다 (실행 프로그램과 유사). –

0

당신은 코드 아래 봄 gracefull 종료 작동 사용하는 경우. 재시도 횟수를 변경할 수 있습니다.

package com.avea.vpspg.test.schedulers; 

import java.util.Map; 
import java.util.concurrent.TimeUnit; 

import org.apache.log4j.Logger; 
import org.springframework.beans.BeansException; 
import org.springframework.beans.factory.config.BeanPostProcessor; 
import org.springframework.context.ApplicationContext; 
import org.springframework.context.ApplicationContextAware; 
import org.springframework.context.ApplicationListener; 
import org.springframework.context.event.ContextClosedEvent; 
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 
import org.springframework.stereotype.Component; 

import com.avea.vpspg.core.VasProvLogger; 

@Component 
class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{ 


private ApplicationContext context; 

public Logger logger = XProvLogger.getInstance().x; 

public void onApplicationEvent(ContextClosedEvent event) { 


    Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class); 

    for (ThreadPoolTaskScheduler scheduler : schedulers.values()) {   
     scheduler.getScheduledExecutor().shutdown(); 
     try { 
      scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS); 
      if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown()) 
       logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped"); 
      else{ 
       logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately"); 
       scheduler.getScheduledExecutor().shutdownNow(); 
       logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately"); 
      } 
     } catch (IllegalStateException e) { 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class); 

    for (ThreadPoolTaskExecutor executor: executers.values()) { 
     int retryCount = 0; 
     while(executor.getActiveCount()>0 && ++retryCount<51){ 
      try { 
       logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount); 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     if(!(retryCount<51)) 
      logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately"); 
     executor.shutdown(); 
     logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed"); 
    } 
} 


@Override 
public void setApplicationContext(ApplicationContext context) 
     throws BeansException { 
    this.context = context; 

} 


@Override 
public Object postProcessAfterInitialization(Object object, String arg1) 
     throws BeansException { 
    return object; 
} 


@Override 
public Object postProcessBeforeInitialization(Object object, String arg1) 
     throws BeansException { 
    if(object instanceof ThreadPoolTaskScheduler) 
     ((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true); 
    if(object instanceof ThreadPoolTaskExecutor) 
     ((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true); 
    return object; 
}