2016-06-09 1 views
1

배치 및 세척 기능을 가진 소비자 :프로듀서/나는 두 가지 방법이 일괄 메일 서비스 쓰기를 시도하고

add(Mail mail) : 우편물을 보낼 수는 생산자

flushMailService()에 의해 호출됩니다 서비스를 세척하십시오. 소비자는 목록을 가져 와서 다른 (값 비싼) 방법을 호출해야합니다. 일반적으로 비싼 방법은 배치 크기에 도달 한 후에 만 ​​호출해야합니다.

이이 질문에 다소 유사합니다 Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch

poll()에 시간 제한이있는 것을 할 수 있습니다. 그러나 제작자는 타임 아웃을 기다리지 않고 생산자가 대기열에있는 메일을 보낼 수있게하면 메일 서비스를 플러시 할 수 있어야합니다.

poll(20, TimeUnit.SECONDS)은 중단 될 수 있습니다. 중단 된 경우 큐가 비어있을 때까지 일괄 처리 크기에 도달했는지 여부와 상관없이 큐의 모든 메일을 보내야합니다 (poll()을 사용하면 큐가 비어있는 경우 즉시 null을 반환 함). 비어 있으면 보낸 메일 그래서 다른 생산자에 의해 중단 때까지 이미 발송 한 중단 생산자는. 그런 다음, 생산자는 다시 poll의 버전을 차단 한 후 호출해야합니다.

이 주어진 구현과 작동하는 것으로 보인다.

내가 시도 ExecutorServicesFutures과 함께 사용하는 것이 좋습니다. 그러나 미래는 단지 이므로 중단 될 수 있습니다. 첫 번째 인터럽트 이후 취소 된 것으로 간주됩니다. 그래서 나는 여러 번 중단 될 수있는 쓰레드에 의지했다.

현재 작동하는 것처럼 보이지만 ("원시"스레드를 사용하는) 다음 구현이 있습니다.

이 방법이 합리적인 방법입니까? 또는 다른 접근법을 사용할 수 있습니까?

public class BatchMailService { 
    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>(); 
    private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>(); 
    private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class); 

    public void checkMails() { 

     int batchSize = 100; 
     int timeout = 20; 
     int consumerCount = 5; 

     Runnable runnable =() -> { 
      boolean wasInterrupted = false; 

      while (true) { 
       List<Mail> buffer = new ArrayList<>(); 
       while (buffer.size() < batchSize) { 
        try { 
         Mail mail; 
         wasInterrupted |= Thread.interrupted(); 
         if (wasInterrupted) { 
          mail = queue.poll(); // non-blocking call 
         } else { 
          mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call 
         } 
         if (mail != null) { // mail found immediately, or within timeout 
          buffer.add(mail); 
         } else { // no mail in queue, or timeout reached 
          LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread()); 
          wasInterrupted = false; 
          break; 
         } 
        } catch (InterruptedException e) { 
         LOGGER.info("{} interrupted", Thread.currentThread()); 
         wasInterrupted = true; 
         break; 
        } 
       } 
       if (!buffer.isEmpty()) { 
        LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size()); 
        mailService.sendMails(buffer); 
       } 
      } 
     }; 

     LOGGER.info("starting 5 threads "); 
     for (int i = 0; i < 5; i++) { 
      Thread thread = new Thread(runnable); 
      threads.add(thread); 
      thread.start(); 
     } 

    } 

    public void addMail(Mail mail) { 
     queue.add(mail); 
    } 

    public void flushMailService() { 
     LOGGER.info("flushing BatchMailService"); 
     for (Thread t : threads) { 
      t.interrupt(); 
     } 
    } 
} 

인터럽트없이 또 다른 방법,하지만 독약 (Mail POISON_PILL = new Mail())의 변형은 다음과 같은 수 있습니다. 아마 하나의 소비자 스레드가있을 때 가장 잘 작동합니다. 적어도 하나의 독약에 대해 단 한 명의 소비자 만 계속됩니다.

Runnable runnable =() -> { 
     boolean flush = false; 
     boolean shutdown = false; 

     while (!shutdown) { 
      List<Mail> buffer = new ArrayList<>(); 
      while (buffer.size() < batchSize && !shutdown) { 
       try { 
        Mail mail; 
        if (flush){ 
         mail = queue.poll(); 
         if (mail == null) { 
          LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed"); 
          flush = false; 
          break; 
         } 
        }else { 
         mail = queue.poll(5, TimeUnit.SECONDS); // blocking call 
        } 

        if (mail == POISON_PILL){ // flush 
         LOGGER.info(Thread.currentThread() + " got flush"); 
         flush = true; 
        } 
        else if (mail != null){ 
         buffer.add(mail); 
        } 
       } catch (InterruptedException e) { 
        LOGGER.info(Thread.currentThread() + " interrupted"); 
        shutdown = true; 
       } 
      } 
      if (!buffer.isEmpty()) { 
       LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails"); 
       mailService.sendEmails(buffer); 
      } 
     } 
    }; 

public void flushMailService() { 
    LOGGER.info("flushing BatchMailService"); 
    queue.add(POISON_PILL); 
} 

답변

1

인터럽트 대신 신호 및 대기를 사용하는 것이 어떻습니까?

제작자는 메일을 보내고 플러시해야 하는지를 알립니다. Dispatcher는 신호 또는 시간 초과를 기다리고 소비자 스레드에서 이메일을 전송합니다.

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 

public class BatchMailService { 

    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>(); 

    public static final int BATCH_SIZE = 100; 
    public static final int TIMEOUT = 20; 
    public static final int CONSUMER_COUNT = 5; 

    private final Lock flushLock = new ReentrantLock(); 
    private final Condition flushCondition = flushLock.newCondition(); 

    MailService mailService = new MailService(); 

    public void checkMails() { 

     ExecutorService consumerExecutor = Executors.newFixedThreadPool(CONSUMER_COUNT); 

     while (true) { 

      try { 
       // wait for timeout or for signal to come 
       flushLock.lock(); 
       flushCondition.await(TIMEOUT, TimeUnit.SECONDS); 

       // flush all present emails 
       final List<Mail> toFLush = new ArrayList<>(); 
       queue.drainTo(toFLush); 

       if (!toFLush.isEmpty()) { 
        consumerExecutor.submit(() -> { 
         LOGGER.info("{} sending {} mails", Thread.currentThread(), toFLush.size()); 
         mailService.sendEmails(toFLush); 
        }); 
       } 

      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       break; // terminate execution in case of external interrupt 
      } finally { 
       flushLock.unlock(); 
      } 
     } 

    } 

    public void addMail(Mail mail) { 

     queue.add(mail); 

     // check batch size and flush if necessary 
     if (queue.size() >= BATCH_SIZE) { 

      try { 
       flushLock.lock(); 
       if (queue.size() >= BATCH_SIZE) { 
        flushMailService(); 
       } 
      } finally { 
       flushLock.unlock(); 
      } 
     } 
    } 

    public void flushMailService() { 
     LOGGER.info("flushing BatchMailService"); 
     try { 
      flushLock.lock(); 
      flushCondition.signal(); 
     } finally { 
      flushLock.unlock(); 
     } 
    } 

} 
+0

잘 경우 생산자 확인 여러 소비자가있는 경우 일괄 처리 크기가 작지 메일로 메일 서비스를 여러 번 호출 할 수 있습니다, 다음에 도달 큐와 플러시의 크기? – user140547

+0

이 문제가 발생하지 않도록 잠금 기능이 있습니다. 누락 된 오류가있을 수 있지만 올바르게 표시됩니다. 'addMail()'은'queue.size()'를 두 번째로 확인하기 전에'fluskLock'을 획득합니다. 'checkMails()'가'await()'에서 깨어 나면 flush를 계속하기 전에'flushLock'을 얻습니다. [link] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition).html # awaitNanos (long)) –

+0

5 개의 병렬 생성자와 1500 개의 메일로 코드를 실행했습니다. 메일이 중복되지 않고 일괄 처리에 'BATCH_SIZE' 항목이 적 으면 아무 것도 없었습니다. –