1

클라이언트 연결을 허용하는 서버 프로그램이 있습니다. 이러한 클라이언트 연결은 여러 스트림에 속할 수 있습니다. 예를 들어 둘 이상의 클라이언트가 동일한 스트림에 속할 수 있습니다. 이 스트림들 중에서 하나의 메시지는 전달해야하지만 모든 스트림이 확립 될 때까지 기다려야합니다. 이를 위해 다음과 같은 데이터 구조를 유지합니다.변수가 특정 값에 도달 할 때까지 스레드를 대기시키는 방법 (다중 스레드 Java)

ConcurrentHashMap<Integer, AtomicLong> conhasmap = new ConcurrentHashMap<Integer, AtomicLong>(); 

Integer는 스트림 ID이고 Long은 클라이언트 번호입니다. AtomicLong이 특정 값에 도달 할 때까지 주어진 스트림에 대한 하나의 스레드를 대기 시키려면 다음 루프를 사용했습니다. 실제로 스트림의 첫 번째 패킷은 스트림 ID와 대기 할 연결 수를 입력합니다. 각 연결마다 대기 연결을 줄입니다.

while(conhasmap.get(conectionID) != new AtomicLong(0)){ 
     // Do nothing 
} 

그러나이 루프는 다른 스레드를 차단합니다. 이것에 따르면 answer 그것은 휘발성 읽기 않습니다. 특정 값에 도달 할 때까지 주어진 스트림에 대해 올바른 스레드를 대기하도록 코드를 수정하려면 어떻게해야합니까?

+0

같은 출력이 왜 또한 Thread.wait()를 사용하지 않는 얻을 그것은 서버 후 일어나? 덜 우아한 해결책은 잠시 잠을 자고 값을 확인하는 것입니다. – PbxMan

+1

['Condition'] (http : // docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Condition.html) 아마도? – fge

+0

@PbxMan 서버가 이미 수락 한 스레드를 기다리고 있습니다. 예 잠시 동안 잠을 자고 값을 읽는 것이 효과적 일지 모르지만, 나는 더 나은 길을 찾는다. – user340

답변

2

귀하의 표현 : 대신 값으로, 동일하지 않습니다 객체 참조를 비교하기 때문에

conhasmap.get(conectionID) != new AtomicLong(0) 

항상 true가됩니다.

conhasmap.get(conectionID).longValue() != 0L) 

하지만 대기하지 않고 다음과 같이 반복/좋은 방법이 아닙니다 끊임없이 CPU 시간을 사용하기 때문에 루프 내에서 논리를 통지 : 더 나은 표현이 될 것이다. 대신 각 스레드는 AtomicLong 인스턴스에서 .wait()를 호출해야하며, 감소 또는 증가 할 경우 AtomicLong 인스턴스의 .notifyAll()을 호출하여 대기중인 각 스레드를 깨우고 식을 확인해야합니다. AtomicLong 클래스는 수정 될 때마다 이미 notifyAll() 메서드를 호출 할 수 있지만 확실하지는 않습니다.

감소시키고/증가 코드에서
AtomicLong al = conhasmap.get(conectionID); 
synchronized(al) { 
    while(al.longValue() != 0L) { 
     al.wait(100); //wait up to 100 millis to be notified 
    } 
} 

, 그것은 다음과 같이 표시됩니다

AtomicLong al = conhasmap.get(conectionID); 
synchronized(al) { 
    if(al.decrementAndGet() == 0L) { 
     al.notifyAll(); 
    } 
} 
당신의 잠금없는 행동 혜택되지 않기 때문에

나는 개인적으로이 카운터에 대한 AtomicLong를 사용하지 것이다 AtomicLong. 어쨌든 wait()/notify() 논리에 대한 카운터 객체를 동기화해야하기 때문에 대신 java.lang.Long을 사용하십시오.

+0

개체 참조 비교를 지적 해 주셔서 감사합니다. 그러나 솔루션은 여전히 ​​작동하지 않습니다. – user340

+0

알 자물쇠가 자물쇠로 일관성이 있다고 생각합니까? 코드가 while 루프의 무한 루프로 바뀝니다. – user340

+0

카운터가 너무 자주 감소하는 상황이 발생할 수 있습니다. 수하는 동안 상태를 변경합니다 동안 (al.longValue()> 0L) 그리고 상태가 될 경우이 : 경우 (al.decrementAndGet은() <= 0L) – Palamino

3

Java 8을 사용하는 경우 CompletableFuture이 적합 할 수 있습니다. 다음은 5 개의 클라이언트가 서버에 연결하여 메시지를 보내길 기다리는 완전하고 고안된 예제입니다 (제안/설문과 함께 BlockingQueue을 사용하여 시뮬레이션 됨).

이 예에서 예상 된 클라이언트 연결 메시지 수에 도달하면 CompletableFuture 후크가 완료되고 원하는 임의의 스레드에서 임의의 코드가 실행됩니다.

이 예제에서는 복잡한 스레드 대기/알림 설정이나 통화 대기 루프가 없습니다.

package so.thread.state; 

import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicBoolean; 
import java.util.concurrent.atomic.AtomicLong; 

public class Main { 

    public static String CONNETED_MSG = "CONNETED"; 
    public static Long EXPECTED_CONN_COUNT = 5L; 

    public static ExecutorService executor = Executors.newCachedThreadPool(); 
    public static BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 

    public static AtomicBoolean done = new AtomicBoolean(false); 

    public static void main(String[] args) throws Exception { 

    // add a "server" thread 
    executor.submit(() -> server()); 

    // add 5 "client" threads 
    for (int i = 0; i < EXPECTED_CONN_COUNT; i++) { 
     executor.submit(() -> client()); 
    } 

    // clean shut down 
    Thread.sleep(TimeUnit.SECONDS.toMillis(1)); 
    done.set(true); 
    Thread.sleep(TimeUnit.SECONDS.toMillis(1)); 
    executor.shutdown(); 
    executor.awaitTermination(1, TimeUnit.SECONDS); 

    } 

    public static void server() { 

    print("Server started up"); 
    // track # of client connections established 
    AtomicLong connectionCount = new AtomicLong(0L); 

    // at startup, create my "hook" 
    CompletableFuture<Long> hook = new CompletableFuture<>(); 
    hook.thenAcceptAsync(Main::allClientsConnected, executor); 

    // consume messages 
    while (!done.get()) { 
     try { 
     String msg = queue.poll(5, TimeUnit.MILLISECONDS); 
     if (null != msg) { 
      print("Server received client message"); 
      if (CONNETED_MSG.equals(msg)) { 
      long count = connectionCount.incrementAndGet(); 

      if (count >= EXPECTED_CONN_COUNT) { 
       hook.complete(count); 
      } 
      } 
     } 

     } catch (Exception e) { 
     e.printStackTrace(); 
     } 
    } 

    print("Server shut down"); 

    } 

    public static void client() { 
    queue.offer(CONNETED_MSG); 
    print("Client sent message"); 
    } 

    public static void allClientsConnected(Long count) { 
    print("All clients connected, count: " + count); 
    } 


    public static void print(String msg) { 
    System.out.println(String.format("[%s] %s", Thread.currentThread().getName(), msg)); 
    } 
} 

새 연결을 받아이

[pool-1-thread-1] Server started up 
[pool-1-thread-5] Client sent message 
[pool-1-thread-3] Client sent message 
[pool-1-thread-2] Client sent message 
[pool-1-thread-6] Client sent message 
[pool-1-thread-4] Client sent message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-1] Server received client message 
[pool-1-thread-4] All clients connected, count: 5 
[pool-1-thread-1] Server shut down 
+0

미리 클라이언트 연결 수를 알지 못합니다. 많은 연결이있을 수 있으며 이러한 연결에는 서로 다른 스트림이 있습니다. 특정 스트림에서 기다려야합니다. – user340

+0

@kani, 이것은 고의적 인 예입니다. 어떤 종류의 로직 (당신이 지금 가지고있는 것과 똑같은 biz 로직)으로 이런 종류의 패턴을 사용할 수 있고'CompletableFuture.complete (..) '를 사용할 수 있습니다. Java가 복잡한 비동기 동작을 구성하기 위해 도입 한 새로운 패러다임을 보여 주어 Object.wait() 및 Object.notify()를 추론합니다. – Alex

+0

내 클라이언트 스레드는 통신 ID를 기준으로 그룹화됩니다. 예를 들어, 15 개의 클라이언트 연결 10이 하나의 통신 A에 속하고 5가 통신 B에 속하는 경우, A는 10 개의 스레드를, B는 5 개의 스레드를 기다려야합니다. CompletableFuture에서는 가능합니까? 스레드가 executor와 함께 수집되어야하고 스레드를 시작할 때 통신 스트림을 구별 할 수 없습니다. – user340