2

add 메서드는 스레드 안전 방식으로 channelMessageHolder CHM을 채우는 다중 스레드에 의해 호출됩니다. 같은 클래스에서 맵을 쓰레드 방식으로 채우고 그 맵을 백그라운드 스레드의 다른 메소드로 전달 하시겠습니까?

, 나는 30 초마다 실행되는 backgrond에 스레드를 가지고 있고 그것은 channelMessageHolder에서 데이터를 전달하여 send 메소드를 호출합니다.

public class Processor { 
    private final ScheduledExecutorService executorService = Executors 
     .newSingleThreadScheduledExecutor(); 
    private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder = 
       new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>()); 

    private Processor() { 
    executorService.scheduleAtFixedRate(new Runnable() { 
     @Override 
     public void run() { 
     send(); 
     } 
    }, 0, 30, TimeUnit.SECONDS); 
    } 

    // this will be called by only single background thread 
    private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) { 
    for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) { 
     Channel channel = entry.getKey(); 
     ConcurrentLinkedQueue<Message> messageHolder = entry.getValue(); 

     while (!messageHolder.isEmpty()) { 
     Message message = messageHolder.poll(); 
     .... 
     // process this and send to database 
     }  
    } 
    } 

    // called by multiple threads 
    public void add(final Channel channel, final Message message) { 
    // populate channelMessageHolder in a thread safe way 
    } 
} 

질문

channelMessageHolder 이미 내 Processor 클래스에 존재 볼 수 있듯이 그래서 명시 적으로 30 초마다이 방법을 보내려면이 맵에서 데이터를 전달해야합니까? 또는 직접 보내기 메서드에서 사용할 수 있습니까?

혼란은 직접 보내기 메서드에서 사용하면 동시에 다중 스레드로 채워집니다. 그래서 send 메서드로 전달하기 위해 AtomicReference의 getAndSet 메서드를 사용하고 있습니다.

내가하는 일이 잘못되었다는 사실을 알려주고 더 좋은 방법이 있습니까?

답변

1

channelMessageHolder가 내 프로세서 클래스에 이미 존재하므로 30 초마다 메서드에서 데이터를 명시 적으로 전달해야합니까? 또는 직접 보내기 메서드에서 사용할 수 있습니까?

당신은 확실히 send() 방법에서 직접 사용할 수 있으며 ConcurrentHashMap가 이미 동기화되어 있기 때문에 당신은 AtomicReference 래퍼가 필요하지 않습니다. 걱정할 필요가있는 것은지도의 키와 값 개체가 제대로 동기화되고 있다는 것입니다. 나는 Channel이 불변이고 ConcurrentLinkedQueue이 동시라고 가정하므로 잘해야합니다. 생산자 스레드가 보낸 스레드가 충돌없이 항목을 보내는 동시에 그것으로 항목을 추가 할 수 있도록

// no need for AtomicReference 
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder = 
    new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>(); 

ConcurrentHashMap 당신을위한 동기화 처리합니다. AtomicReference은 여러 스레드간에 동기화되지 않은 클래스를 공유하려는 경우에만 필요합니다.

혼란은 내가 직접 내 send 메소드에서 사용한다면, 동시에 여러 스레드에 의해 채워질 것이다. 그래서 나는 메소드를 보내기 위해 AtomicReference의 getAndSet 메소드를 사용하고있다.

그래도 괜찮습니다. 여러 스레드가 ConcurrentLinkedQueue이라는 메시지를 추가합니다. 백그라운드 스레드가 시작될 때마다 30 초마다 Channel이 대기열에서 제외되고 그 순간 대기열에있는 메시지가 전송됩니다. ConcurrentLinkedQueue은 생산자와 소비자의 경쟁 조건을 보호합니다. 하나 개의 스레드 디큐있을 보이기 때문에 그것은 당신의 경우에 작동

while (!messageHolder.isEmpty()) { 
    Message message = messageHolder.poll(); 

:

코드에서이 문제는 큐를 여러 번 호출에 의존하기 때문에이 재진입되지 않는 것입니다 하지만 다음 코드가 더 좋습니다 :

while (true) { 
    // only one call to the concurrent queue 
    Message message = messageHolder.poll(); 
    if (message == null) { 
     break; 
    } 
    ... 
} 
+0

원자 참조가 필요없는 이유를 설명 할 수 있습니까? 동일한 CHM에서 동일한 데이터를보기 위해 모든 스레드가 필요하다면 그럴 필요가 있다고 생각했습니다. 또한 나는 (업데이트 섹션에서) 사용하고있는 전체 코드로 내 질문을 업데이트했다. 나는 단지 내가 올바른 코드를 얻고 있는지 확인하여 내가 사용할 전체 코드를 확인하려고했다. 병렬로 여러 채널을 실행 중이며 최대 5-6 채널을 갖습니다. 그리고 채널은 내 코드에서 열거 형 클래스입니다. – user1950349

+0

또한 'while true'를 사용하면 영원히 계속 실행됩니다. 그런 경우 30 초마다 실행되는 백그라운드 스레드가 필요하지 않습니까? 내 경우에는 CHM이 언제든지 비어있을 것입니다. 항상 가득 차서 매 30 초마다 실행되지만 무한 루프로 계속 실행되므로 영원히 계속 실행되고 다음 백그라운드 스레드는 실행되지 않습니다. 나는 추측했다. – user1950349

+0

동기화되지 않은 객체를 공유하려고한다면 'AtomicReference'가 @ user1950349 필요합니다. CHM은 이미 내부적으로 동기화되어 있기 때문에 데이터를 직접 동기화 할 필요없이 여러 스레드가 공유 할 수 있습니다. 표준'HashMap'을 사용했다면'AtomicReference'를 사용해야 할 것입니다. – Gray

1

아니면 내가 직접 문제없이 send 방법의 시작 부분에 channelMessageHolder.getAndSet(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>())을 말함으로써 당신은 직접 send 방법을 사용할 수 있어야합니다 아무것도

을 통과하지 않고 내 send 메소드에서 사용할 수 있습니다 . 말했다

, 자바 8 당신이 정말로 안 사용되는 AtomicReference 것을해야한다는 의미 ConcurrentHashMap 클래스에 computeIfAbsent라는 새로운 방법을 추가했습니다.

+0

나는 여전히 'computeIfAbsent'를 사용할 수 없으므로 여전히 Java 7입니다. 또한 경쟁 조건은 무엇이며 어떻게 해결할 수 있습니까? 그리고 내 보내기 메서드는 하나의 배경 스레드에 의해서만 호출됩니다 .. – user1950349

+0

CLQ 대신 여기에 다른 데이터 구조를 사용하고 싶습니까? 스레드 안전 때문에 사용하기 시작했습니다 – user1950349

+0

@ user1950349 제 잘못입니다. 나는'newSingleThreadScheduledExecutor'를'newScheduledThreadPool'으로 잘못 읽었습니다. 나는 그런 상황에있을 것이라고 말한 시점에서 경쟁 조건이 없을 것이다. 스레드 안전성은 복잡한 주제이므로 달성하고자하는 것이 무엇인지 이해하지 않고서는 더 이상의 조언을하기가 어려울 수 있습니다. 즉, 대신 BlockingQueue 구현 중 하나를 사용할 수 있습니다. – CKing