2017-02-19 10 views
1

: 데이터를받지 못하면 데이터를 보내고 다시 시도 하시겠습니까? 내가 일 이하로 할 필요가 프로젝트를 진행하고

  • 다른 시스템에 특정 소켓에 특정 데이터를 전송합니다. 주어진 소켓에 특정 바이트 배열을 보내야합니다. 각 바이트 배열에는 고유 한 긴 주소가 있습니다.
  • 그런 다음 아래에 구현 된 RetryStrategy 중 하나를 사용하여 동일한 데이터를 보내려면 계속 시도하십시오.
  • 보내신 데이터가 다른 시스템에서 수신되었는지 여부를 알려주는 백그라운드 폴러 스레드를 시작하십시오. 수신 된 경우 다시 시도하지 않도록 pending 대기열에서 제거하고 어떤 이유로 든 수신하지 못한 경우 우리가 사용한 RetryStrategy를 사용하여 동일한 데이터를 다시 전송하려고 시도합니다.
  • 예를 들어

: 우리는 addressA로 고유의 긴 주소가 byteArrayA을 전송하고이를 다른 시스템에서 recived 경우, 다음 내 폴러 스레드는 승인이 지금 그렇게받은 의미로 다시이 addressA을받을 경우 우리 보류중인 큐에서이 주소를 제거하여 다시 시도하지 않도록 할 수 있습니다.

나는 ConstantBackoffExponentialBackoff 두 가지를 구현했습니다. 그래서 위의 흐름을 시뮬레이트하는 시뮬레이터를 생각해 냈습니다.

public class Experimental { 
    /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */ 
    interface RetryStrategy { 
    long getDelayMs(int retry); 
    } 

    public enum ConstantBackoff implements RetryStrategy { 
    INSTANCE; 
    @Override 
    public long getDelayMs(int retry) { 
     return 1000L; 
    } 
    } 

    public enum ExponentialBackoff implements RetryStrategy { 
    INSTANCE; 
    @Override 
    public long getDelayMs(int retry) { 
     return 100 + (1L << retry); 
    } 
    } 

    /** A container that sends messages with retries. */  
    private static class Sender { 
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(20); 
    private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>(); 

    /** Send the given (simulated) data with given address on the given socket. */ 
    void sendTo(long addr, byte[] data, int socket) { 
     System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket); 
    } 

    /** The state of a message that's being retried. */ 
    private class Retrier implements Runnable { 
     private final RetryStrategy retryStrategy; 
     private final long addr; 
     private final byte[] data; 
     private final int socket; 
     private int retry; 
     private Future<?> future; 

     Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) { 
     this.retryStrategy = retryStrategy; 
     this.addr = addr; 
     this.data = data; 
     this.socket = socket; 
     this.retry = 0; 
     } 

     private synchronized void start() { 
     if (future == null) { 
      future = executorService.submit(this); 
      pending.put(addr, this); 
     } 
     } 

     private synchronized void cancel() { 
     if (future != null) { 
      future.cancel(true); 
      future = null; 
     } 
     } 

     private synchronized void reschedule() { 
     if (future != null) { 
      future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS); 
     } 
     } 

     @Override 
     synchronized public void run() { 
     sendTo(addr, data, socket); 
     reschedule(); 
     } 
    } 

    /** 
    * Get a (simulated) verified message address. Just picks a pending 
    * one. Returns zero if none left. 
    */  
    long getVerifiedAddr() { 
     System.err.println("Pending messages: " + pending.size()); 
     Iterator<Long> i = pending.keySet().iterator(); 
     long addr = i.hasNext() ? i.next() : 0; 
     return addr; 
    } 

    /** A polling loop that cancels retries of (simulated) verified messages. */   
    class CancellationPoller implements Runnable { 
     @Override 
     public void run() { 
     while (!Thread.currentThread().isInterrupted()) { 
      try { 
      Thread.sleep(1000); 
      } catch (InterruptedException ex) { 
      Thread.currentThread().interrupt(); 
      } 
      long addr = getVerifiedAddr(); 
      if (addr == 0) { 
      continue; 
      } 
      System.err.println("Verified message (to be cancelled) " + addr); 
      Retrier retrier = pending.remove(addr); 
      if (retrier != null) { 
      retrier.cancel(); 
      } 
     } 
     } 
    } 

    private Sender initialize() { 
     executorService.submit(new CancellationPoller()); 
     return this; 
    } 

    private void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) { 
     new Retrier(retryStrategy, addr, data, socket).start(); 
    } 
    } 

    public static void main(String[] args) { 
    Sender sender = new Sender().initialize(); 
    for (long i = 1; i <= 10; i++) { 
     sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42); 
    } 
    for (long i = -1; i >= -10; i--) { 
     sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37); 
    } 
    } 
} 

위의 코드 또는 스레드 안전 문제에 어떤 경쟁 조건이 있습니까? 다중 스레드에서 바로 잡기가 어렵 기 때문에.

동일한 작업을 수행하는 더 효과적인 방법이 있는지 알려주십시오.

+0

내 조언은 정말 너무하지 않는 한 스레드를 사용하지 않는 것입니다 ... 하나의 스레드 내에서 모든 것을 수행하기 위해 이벤트 루프를 사용할 수 있지만 여전히 비동기 실행이 가능합니다. – AJC

답변

0

코드에 제 3 자 라이브러리와 아무런 관련이 없다면 https://github.com/nurkiewicz/async-retry을 살펴볼 수 있습니다. 내가 당신의 문제를 올바르게 이해했다고 가정하면,이 라이브러리는 바퀴를 다시 만들지 않고도 그런 종류의 문제를 처리 할 수있게 해줍니다. 그것은 꽤 명확하게 쓰여졌 고 문서화가 잘되어있어서 갈 길을 찾을 수 있기를 바랍니다. :)

+0

[Spring Retry] (https://github.com/spring-projects/spring-retry) 다른 Spring Project의 [independent] (http://docs.spring.io/spring-batch/reference/html/retry.html)에서 사용할 수 있습니다. . – mfulton26