2017-04-18 5 views
0

데이터베이스에서 매분 (20 메시지 쯤) 메시지 묶음을 선택해야하며이를 동시에 처리해야합니다. 매분마다 데이터베이스에서 메시지를 가져 오는 데 EJB 타이머 서비스 (스케줄러)를 사용하고 있습니다.EJB 타이머 서비스를 사용하는 Java 다중 스레드

기본적으로 매분 20-30 개의 메시지를 골라야하며이 메시지를 처리 ​​한 후에 메일을 보내야합니다. 메시지 처리와 관련된 데이터베이스 작업은 거의 없습니다.

java.concurrent 패키지의 실행 프로그램 서비스 프레임 워크를 사용할 수있는 방법과이 메시지가 매분 제출되는 방법을 제안 해주십시오.

+0

ExecutorService를 돕기위한 많은 자습서가 있습니다. 이 접근법에는 다른 고려 사항이 있습니다. 1) 20-30 메시지 처리에 1 분 이상 소요되면 어떻게됩니까? 2) 20-30 개의 메시지가 50-100이되면 어떻게됩니까? 3) 각 메시지는 트랜잭션입니까 (db commit 일 때)? 4) 메시지 처리 순서가 중요합니까? 5) 술집/하위 건축물이 더 적합할까요? –

+0

안녕하세요 앤드류, PFB 응답 1) 예. 이 모든 메시지를 처리하는 데 1 분 이상 걸릴 수 있습니다. 다음 20-30 개의 메시지를 매분마다 선택하여 처리해야합니다.) 2) 현재 20-30 개의 메시지를 처리해야합니다. 만약에 배치 크기가 증가한다면 우리는 이러한 많은 메시지들을 어떤 큐에 주차시킬 수 있습니까? 3) 아니요 우리는 단일 트랜잭션에서 모든 데이터베이스 작업을 커밋하고 있습니다. 4) No. 각 메시지가 독립적으로 처리 될 수 있기 때문에 처리 순서는 중요하지 않습니다. –

+0

@ SanketMurugkar이 내 대답에 도움이 되었습니까? :)하지 않으면 작동하지 않는 것을 지적 할 수 있습니까? – Sneh

답변

2

안녕하세요, Java의 ExecutorService, CountDownLatch 및 CompletableFuture를 사용하는 기본 예입니다. 이 예제는 옳은 방향으로 당신을 가리키며 절대 완벽한 것은 아니며 많은 Java8 항목을 사용합니다 (Java8을 사용한다고 가정 함). 또한 EJB Timer를 사용하지 않고 ScheduledExecutorService를 사용하고 있지만 쉽게 교환 할 수 있습니다.

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.*; 
import java.util.stream.Collectors; 

public class BatchMessageProcessingExample { 

    private static final int BATCH_SIZE = 20; 

    //Having 20 here may not entirely benefit you. Chosing this number depends on a lot of stuff. 
    // Its usually better to go with total number of cores you have 
    private final ExecutorService pool = Executors.newFixedThreadPool(BATCH_SIZE); 

    private final ScheduledExecutorService databasePool = Executors.newScheduledThreadPool(1); 

    public void schedule() { 
     databasePool.scheduleWithFixedDelay(() -> runBatchProcess(), 0, 1, TimeUnit.MINUTES); //Schedule the database work to execute every minute 
    } 

    private void runBatchProcess() { 
     List<Message> taskFromDbFetch = getMessagesFromDb(); //Get stuff from the db 
     CountDownLatch countDownLatch = new CountDownLatch(taskFromDbFetch.size()); //Create a latch having same size as the list 

     List<Task> taskList = taskFromDbFetch.stream().map(x -> new Task(countDownLatch, x)).collect(Collectors.toList()); // Create tasks using the messages and the countdown latch 

     taskList.forEach(pool::execute); //Submit them all in pool 

     CompletableFuture.runAsync(() -> sendEmailAfterCompletion(countDownLatch)); //Send an email out from a separate thread 
    } 

    private void sendEmailAfterCompletion(CountDownLatch countDownLatch) { 
     try { 
      countDownLatch.await();//Await on the latch for the batch tasks to complete 
      sendEmail(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    private void sendEmail() { 
     System.out.println("Sending out an email."); 
    } 

    private List<Message> getMessagesFromDb() { //Get your messages from db here 
     List<Message> messages = new ArrayList<>(); 

     for(int i = 0; i < BATCH_SIZE; i++) { 
      final int taskNumber = i; 
      messages.add(() -> System.out.println("I am a db message number " + taskNumber)); 
     } 

     return messages; 
    } 

    class Task implements Runnable { 

     private final CountDownLatch countDownLatch; 

     private final Message message; 

     public Task(CountDownLatch countDownLatch, Message message) { 
      this.countDownLatch = countDownLatch; 
      this.message = message; 
     } 

     @Override 
     public void run() { 
      message.process(); //Process the message 
      countDownLatch.countDown(); //Countdown the latch 
     } 
    } 

    interface Message { 
     void process(); 
    } 

    public static void main(String[] args) { 
     new BatchMessageProcessingExample().schedule(); 
    } 

}