배치 및 세척 기능을 가진 소비자 :프로듀서/나는 두 가지 방법이 일괄 메일 서비스 쓰기를 시도하고
add(Mail mail)
: 우편물을 보낼 수는 생산자
flushMailService()
에 의해 호출됩니다 서비스를 세척하십시오. 소비자는 목록을 가져 와서 다른 (값 비싼) 방법을 호출해야합니다. 일반적으로 비싼 방법은 배치 크기에 도달 한 후에 만 호출해야합니다.
이이 질문에 다소 유사합니다 Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch
이 poll()
에 시간 제한이있는 것을 할 수 있습니다. 그러나 제작자는 타임 아웃을 기다리지 않고 생산자가 대기열에있는 메일을 보낼 수있게하면 메일 서비스를 플러시 할 수 있어야합니다.
poll(20, TimeUnit.SECONDS)
은 중단 될 수 있습니다. 중단 된 경우 큐가 비어있을 때까지 일괄 처리 크기에 도달했는지 여부와 상관없이 큐의 모든 메일을 보내야합니다 (poll()
을 사용하면 큐가 비어있는 경우 즉시 null
을 반환 함). 비어 있으면 보낸 메일 그래서 다른 생산자에 의해 중단 때까지 이미 발송 한 중단 생산자는. 그런 다음, 생산자는 다시 poll
의 버전을 차단 한 후 호출해야합니다.
이 주어진 구현과 작동하는 것으로 보인다.
내가 시도 ExecutorServices을 Futures과 함께 사용하는 것이 좋습니다. 그러나 미래는 단지 이므로 중단 될 수 있습니다. 첫 번째 인터럽트 이후 취소 된 것으로 간주됩니다. 그래서 나는 여러 번 중단 될 수있는 쓰레드에 의지했다.
현재 작동하는 것처럼 보이지만 ("원시"스레드를 사용하는) 다음 구현이 있습니다.
이 방법이 합리적인 방법입니까? 또는 다른 접근법을 사용할 수 있습니까?
public class BatchMailService {
private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();
private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>();
private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class);
public void checkMails() {
int batchSize = 100;
int timeout = 20;
int consumerCount = 5;
Runnable runnable =() -> {
boolean wasInterrupted = false;
while (true) {
List<Mail> buffer = new ArrayList<>();
while (buffer.size() < batchSize) {
try {
Mail mail;
wasInterrupted |= Thread.interrupted();
if (wasInterrupted) {
mail = queue.poll(); // non-blocking call
} else {
mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call
}
if (mail != null) { // mail found immediately, or within timeout
buffer.add(mail);
} else { // no mail in queue, or timeout reached
LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread());
wasInterrupted = false;
break;
}
} catch (InterruptedException e) {
LOGGER.info("{} interrupted", Thread.currentThread());
wasInterrupted = true;
break;
}
}
if (!buffer.isEmpty()) {
LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size());
mailService.sendMails(buffer);
}
}
};
LOGGER.info("starting 5 threads ");
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
thread.start();
}
}
public void addMail(Mail mail) {
queue.add(mail);
}
public void flushMailService() {
LOGGER.info("flushing BatchMailService");
for (Thread t : threads) {
t.interrupt();
}
}
}
인터럽트없이 또 다른 방법,하지만 독약 (Mail POISON_PILL = new Mail()
)의 변형은 다음과 같은 수 있습니다. 아마 하나의 소비자 스레드가있을 때 가장 잘 작동합니다. 적어도 하나의 독약에 대해 단 한 명의 소비자 만 계속됩니다.
Runnable runnable =() -> {
boolean flush = false;
boolean shutdown = false;
while (!shutdown) {
List<Mail> buffer = new ArrayList<>();
while (buffer.size() < batchSize && !shutdown) {
try {
Mail mail;
if (flush){
mail = queue.poll();
if (mail == null) {
LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed");
flush = false;
break;
}
}else {
mail = queue.poll(5, TimeUnit.SECONDS); // blocking call
}
if (mail == POISON_PILL){ // flush
LOGGER.info(Thread.currentThread() + " got flush");
flush = true;
}
else if (mail != null){
buffer.add(mail);
}
} catch (InterruptedException e) {
LOGGER.info(Thread.currentThread() + " interrupted");
shutdown = true;
}
}
if (!buffer.isEmpty()) {
LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails");
mailService.sendEmails(buffer);
}
}
};
public void flushMailService() {
LOGGER.info("flushing BatchMailService");
queue.add(POISON_PILL);
}
잘 경우 생산자 확인 여러 소비자가있는 경우 일괄 처리 크기가 작지 메일로 메일 서비스를 여러 번 호출 할 수 있습니다, 다음에 도달 큐와 플러시의 크기? – user140547
이 문제가 발생하지 않도록 잠금 기능이 있습니다. 누락 된 오류가있을 수 있지만 올바르게 표시됩니다. 'addMail()'은'queue.size()'를 두 번째로 확인하기 전에'fluskLock'을 획득합니다. 'checkMails()'가'await()'에서 깨어 나면 flush를 계속하기 전에'flushLock'을 얻습니다. [link] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition).html # awaitNanos (long)) –
5 개의 병렬 생성자와 1500 개의 메일로 코드를 실행했습니다. 메일이 중복되지 않고 일괄 처리에 'BATCH_SIZE' 항목이 적 으면 아무 것도 없었습니다. –