2017-01-31 9 views
3

나는 맵을 채우는 클래스를 가지고있다. 하나의 백그라운드 스레드에서 매 30 초마다 하나의 백그라운드 스레드로부터 getNextSocket을 얻는다.이 맵핑을 얻기 위해 여러 개의 리더 스레드가 호출 할 것이다. 이 정보.단일 스레드에서 해시 맵을 수정하고 여러 스레드에서 읽기?

public class SocketManager { 
    private static final Random random = new Random(); 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new HashMap<>(); 
    private final ZContext ctx = new ZContext(); 

    // Lazy Loaded Singleton Pattern 
    private static class Holder { 
    private static final SocketManager instance = new SocketManager(); 
    } 

    public static SocketManager getInstance() { 
    return Holder.instance; 
    } 

    private SocketManager() { 
    connectToZMQSockets(); 
    scheduler.scheduleAtFixedRate(new Runnable() { 
     public void run() { 
     updateLiveSockets(); 
     } 
    }, 30, 30, TimeUnit.SECONDS); 
    } 

    private void connectToZMQSockets() { 
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 
    for (Map.Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); 
     liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets); 
    } 
    } 

    private List<SocketHolder> connect(Datacenters colo, List<String> addresses, int socketType) { 
    List<SocketHolder> socketList = new ArrayList<>(); 
    for (String address : addresses) { 
     try { 
     Socket client = ctx.createSocket(socketType); 
     // Set random identity to make tracing easier 
     String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt()); 
     client.setIdentity(identity.getBytes(ZMQ.CHARSET)); 
     client.setTCPKeepAlive(1); 
     client.setSendTimeOut(7); 
     client.setLinger(0); 
     client.connect(address); 

     SocketHolder zmq = new SocketHolder(client, ctx, address, true); 
     socketList.add(zmq); 
     } catch (Exception ex) { 
     // log error 
     } 
    } 
    return socketList; 
    } 

    // this method will be called by multiple threads to get the next live socket 
    public Optional<SocketHolder> getNextSocket() { 
    Optional<SocketHolder> liveSocket = Optional.absent(); 
    List<Datacenters> dcs = Datacenters.getOrderedDatacenters(); 
    for (Datacenters dc : dcs) { 
     liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc)); 
     if (liveSocket.isPresent()) { 
     break; 
     } 
    } 
    return liveSocket; 
    } 

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { 
    if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
     Collections.shuffle(listOfEndPoints); 
     for (SocketHolder obj : listOfEndPoints) { 
     if (obj.isLive()) { 
      return Optional.of(obj); 
     } 
     } 
    } 
    return Optional.absent(); 
    } 

    private void updateLiveSockets() { 
    Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 

    for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
     List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey()); 
     List<SocketHolder> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketHolder liveSocket : liveSockets) { 
     Socket socket = liveSocket.getSocket(); 
     String endpoint = liveSocket.getEndpoint(); 
     Map<byte[], byte[]> holder = populateMap(); 

     boolean status = SendToSocket.getInstance().execute(3, holder, socket); 
     boolean isLive = (status) ? true : false; 
     SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive); 
     liveUpdatedSockets.add(zmq); 
     } 
     liveSocketsByDatacenter.put(entry.getKey(), liveUpdatedSockets); 
    } 
    } 
} 

당신이 내 위 클래스에서 볼 수 있듯이 :

  • 를 30 초마다 실행되는 단일 백그라운드 스레드에서, 나는 모든 라이브 소켓 liveSocketsByDatacenter지도를 채 웁니다.
  • 그리고 여러 스레드에서 getNextSocket 메서드를 호출하여 liveSocketsByDatacenter 맵을 사용하여 필요한 정보를 얻을 수있는 라이브 소켓을 제공합니다.

내 위 코드가 안전하며 모든 판독기 스레드에 liveSocketsByDatacenter이 정확하게 표시됩니까? 단일 배경 스레드에서 매 30 초마다 liveSocketsByDatacenter 맵을 수정하고 나서 많은 판독기 스레드에서 수정하고 있으므로 getNextSocket 메서드를 호출하므로 여기에 잘못된 것이 있는지 확실하지 않습니다.

내 "getLiveSocket"메소드에 스레드 안전성 문제가있는 것처럼 보일 때마다 모든 읽기가 공유 된 ArrayList을 맵 밖으로 가져 와서 셔플 한 것처럼 보입니까? 그리고 제가 그리워 할 수도있는 장소가 몇 개 더 없을 수도 있습니다. 내 코드에서 이러한 스레드 안전 문제를 해결하는 가장 좋은 방법은 무엇입니까?

더 좋은 방법이 있다면이 내용을 다시 작성하면됩니다.

답변

1

스레드로부터 안전하려면 코드가 공유 가능한 모든 공유 상태에 대한 액세스를 동기화해야합니다. 여기

는 임의 액세스를 동기화없이 liveSocketsByDatacenter (connectToZMQSocketsupdateLiveSockets 의해) HashMap (updateLiveSocketsgetNextSocket 의해) 잠재적으로 동시에 판독 될 수있는 Map비 스레드 안전 구현 및 변성의 인스턴스를 공유 이미 코드를 스레드로부터 안전하지 않게 만들 수 있습니다. 또한, 본 Map의 값 ArrayList (getNextSocketupdateLiveSockets 의해)과 같은 잠재적으로 동시에 판독 될 수있는 List비 스레드 안전 구현 및 변성의 인스턴스 (Collections.shuffle하여보다 정확하게 getLiveSocket 의해)이다. 대한

  1. 대신 HashMapConcurrentHashMap를 사용하여 변수 liveSocketsByDatacenter 그것은 기본적 Map의 안전 구현을 스레드로 :에

    당신이 스레드 안전 문제를 해결하는 간단한 방법이 될 수 있습니다.

  2. 버전을 ArrayList 인스턴스를 Collections.unmodifiableList(List<? extends T> list)을 사용하여지도의 값으로 넣으면 목록이 변경되지 않으므로 스레드로부터 안전합니다. 예를 들어

:

liveSocketsByDatacenter.put(
    entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets) 
);` 
목록에 직접 Collections.shuffle를 호출 피하기 위해 방법 getLiveSocket를 다시
  • , 당신은 예를 들어 대신 라이브 소켓의 목록을 셔플 수있는
      (예 : new ArrayList<>(listOfEndPoints)) 목록 자체 대신 목록의 사본을 사용하십시오. 예를 들어

    : 자주 읽을 거의 (한 번 30 초마다)지도를 수정하는 것 같은


    는 # 1를 들어, 당신이 고려할 수

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) { 
        if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
         // The list of live sockets 
         List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size()); 
         for (SocketHolder obj : listOfEndPoints) { 
          if (obj.isLive()) { 
           liveOnly.add(obj); 
          } 
         } 
         if (!liveOnly.isEmpty()) { 
          // The list is not empty so we shuffle it an return the first element 
          Collections.shuffle(liveOnly); 
          return Optional.of(liveOnly.get(0)); 
         } 
        } 
        return Optional.absent(); 
    } 
    
    다음 주지도를 재건 30 초마다 불변 버전 (Collections.unmodifiableMap(Map<? extends K,? extends V> m) 사용)을 사용하는 경우지도의 콘텐츠에 액세스하기 위해 더 이상 동기화 메커니즘의 가격을 지불하지 않으므로 대부분이 읽기 시나리오에서 매우 효율적입니다.

    귀하의 코드는 다음과 같다 :

    // Your variable is no more final, it is now volatile to ensure that all 
    // threads will see the same thing at all time by getting it from 
    // the main memory instead of the CPU cache 
    private volatile Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter 
        = Collections.unmodifiableMap(new HashMap<>()); 
    
    private void connectToZMQSockets() { 
        Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 
        // The map in which I put all the live sockets 
        Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>(); 
        for (Map.Entry<Datacenters, ImmutableList<String>> entry : 
         socketsByDatacenter.entrySet()) { 
    
         List<SocketHolder> addedColoSockets = connect(
          entry.getKey(), entry.getValue(), ZMQ.PUSH 
         ); 
         liveSockets.put(entry.getKey(), Collections.unmodifiableList(addedColoSockets)); 
        } 
        // Set the new content of my map as an unmodifiable map 
        this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSockets); 
    } 
    
    public Optional<SocketHolder> getNextSocket() { 
        // For the sake of consistency make sure to use the same map instance 
        // in the whole implementation of my method by getting my entries 
        // from the local variable instead of the member variable 
        Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
         this.liveSocketsByDatacenter; 
        ... 
    } 
    ... 
    // Added the modifier synchronized to prevent concurrent modification 
    // it is needed because to build the new map we first need to get the 
    // old one so both must be done atomically to prevent concistency issues 
    private synchronized void updateLiveSockets() { 
        // Initialize my new map with the current map content 
        Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
         new HashMap<>(this.liveSocketsByDatacenter); 
        Map<Datacenters, ImmutableList<String>> socketsByDatacenter = Utils.SERVERS; 
        // The map in which I put all the live sockets 
        Map<Datacenters, List<SocketHolder>> liveSockets = new HashMap<>(); 
        for (Entry<Datacenters, ImmutableList<String>> entry : socketsByDatacenter.entrySet()) { 
         ... 
         liveSockets.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets)); 
        } 
        // Set the new content of my map as an unmodifiable map 
        this.liveSocketsByDatacenter = Collections.unmodifiableMap(liveSocketsByDatacenter); 
    } 
    

    귀하의 필드 liveSocketsByDatacenter도 유형 AtomicReference<Map<Datacenters, List<SocketHolder>>>이 될 수있다, 그 다음 final 것,지도는 여전히 변수 volatile뿐만 클래스 AtomicReference에 저장됩니다.

    앞의 코드는 다음과 같다 : 사용

    private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter 
        = new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>())); 
    
    ... 
    
    private void connectToZMQSockets() { 
        ... 
        // Update the map content 
        this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSockets)); 
    } 
    
    public Optional<SocketHolder> getNextSocket() { 
        // For the sake of consistency make sure to use the same map instance 
        // in the whole implementation of my method by getting my entries 
        // from the local variable instead of the member variable 
        Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
         this.liveSocketsByDatacenter.get(); 
        ... 
    } 
    
    // Added the modifier synchronized to prevent concurrent modification 
    // it is needed because to build the new map we first need to get the 
    // old one so both must be done atomically to prevent concistency issues 
    private synchronized void updateLiveSockets() { 
        // Initialize my new map with the current map content 
        Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = 
         new HashMap<>(this.liveSocketsByDatacenter.get()); 
        ... 
        // Update the map content 
        this.liveSocketsByDatacenter.set(Collections.unmodifiableMap(liveSocketsByDatacenter)); 
    } 
    
  • +0

    좋은 제안. 나는 모든 것을 이해했다. 앞서 언급 한 것처럼 동기화 된 블록을 제거 할 수 있도록 AtomicReference를 사용하기로 결정했습니다. 나는 당신의 제안에 대한 모든 것을 올바르게 파악할 수 있도록 요지를 둡니다. (https://gist.github.com/TechGeeky/ae9f3b8863710a67c1d68a21c74c24da) 이제 우리는'AtomicReference'를 사용하고 있기 때문에, 많은 장소에서'AtomicReference'에'get()'메소드를 사용하여 실제 Map을 얻을 필요가 있습니다. 나는 그 것들을 나의 요지에서 편집했다. 내가 모든게 옳다는 것을 알려줘. –

    +0

    @ user5447339 덮어 쓰지 않는 지점 # 3을 제외하고는 좋은 소리가납니다. –

    +0

    죄송합니다. 나는 어떻게 든 그것을 놓쳤습니다. 다시 요점을 다시 편집했습니다.지도가 휘발성 인 경우 왜 동기화 된 블록이 필요하지 않은지 설명 할 수 있습니까? 또한'ConcurrentHashMap'을 사용하지 않는 것의 이점은 불변 버전에서'HashMap'을 사용하면 성능 이점을 얻을 수 있기 때문입니다. 내 이해를 위해서 왜 이것이 더 유익한 지 설명 할 수 있겠습니까? –

    1

    안전하게 대신 ConcurrentHashMap을 사용할 수 있습니다. 정상적으로 작동합니다.

    getNextSocket, connectToZMQSocketsupdateLiveSockets (사방이 업데이트하거나 HashMap의 읽기)하는 방법이나 다른 잠금 전에 sychronized 단어처럼 : 일반의 HashMap을 사용하여 현재의 접근 방식에서

    는, 당신은 방법의 동기화가 필요 이러한 모든 방법에 공통적 인 모니터 - 이것은 ConcurrentModificationException 때문에 발생하지는 않습니다 만 동기화가 없으면 읽기 스레드가 업데이트되지 않은 값을 볼 수 있습니다. 당신이 읽을 수있는 것처럼

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> endPoints) { 
        List<SocketHolder> listOfEndPoints = new ArrayList<SocketHolder>(endPoints); 
        if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
    
         Collections.shuffle(listOfEndPoints); 
         for (SocketHolder obj : listOfEndPoints) { 
         if (obj.isLive()) { 
          return Optional.of(obj); 
         } 
         } 
        } 
        return Optional.absent(); 
        } 
    
    +0

    '사방 synchronized' 가장 우아한 솔루션입니다. – Kayaman

    +0

    @ 카야 만 여기 CHM을 사용하면 잘 될 것입니다. –

    +0

    @ user5447339 될 것으로 보입니다. 나는 명백한 경기 조건이나 다른 문제를 보지 못합니다. – Kayaman

    1

    을 다음 getLiveSocket의 동시 변경에 문제도있다

    는이 문제를 방지하는 가장 간단한 방법 중 하나는 다음과 같이, 셔플 전에 새 목록에 listOfEndpoints을 복사하는 것입니다 세부 사항 예 here 여러 스레드가 동시에 해시 맵에 액세스하고 스레드 중 하나 이상이 구조적으로 맵을 수정하는 경우 일관성없는 내용보기를 피하기 위해 외부 동기화해야합니다. 스레드 안전성을 유지하려면 Java Collections synchronizedMap() 메소드 또는 ConcurrentHashMap을 사용해야합니다.

    //synchronizedMap 
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = Collections.synchronizedMap(new HashMap<Datacenters, List<SocketHolder>>());  
    

    또는

    //ConcurrentHashMap 
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<Datacenters, List<SocketHolder>>(); 
    

    당신은 매우 높은 동시 응용 프로그램 수정 및 다른 스레드에서 읽기 키 값을 가지고

    , 당신은 또한 생산자 - 소비자의 원칙에서, 예를 들어,보고해야 here.

    +0

    입니다. 내 "getLiveSocket"메서드에서 스레드 안전 문제가있는 것처럼 보입니다. 모든 읽기는 공유 ArrayList를지도에서 가져 와서 뒤섞습니다. 그리고 나머지는 거의 없을 수도 있습니다. 어떤 생각을 어떻게 고칠 수 있습니까? –

    -1

    ConcurrentHashMap을 사용하면 코드가 스레드 안전을 유지해야합니다. 또는 synchronized 메소드를 사용하여 기존 해시 맵에 액세스하십시오.