2016-12-11 4 views
0

BlockingQueue<E> 인스턴스의 콜렉션이 주어진 경우 전체적으로 take()poll() 블로킹을 구현하는 가장 효율적인 방법은 무엇입니까? 여기에 코드입니다 :이 클래스의 notFull 그냥 Condition의 샘플 사용에 같은 notEmpty 조건 만/제공 방법을 추가 무시하고 발생하게 별도의 클래스에서 각 소스 큐 포장 계획이었다블로킹 큐의 "공용체"구현

class UnionBlockingQueue<E> implements BlockingQueue<E> { 
    private final Collection<BlockingQueue<E>> sources; 

    public UnionBlockingQueue(Collection<BlockingQueue<E>> sources) { 
    this.sources = sources; 
    } 


    @Override 
    public E take() throws InterruptedException { 
    // takes first available E from the sources 
    } 

    @Override 
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
    // polls first available E from the sources 
    } 

    // The rest is unsupported/irrelevant/out of scope 
    @Override 
    public boolean add(E e) { 
    throw new UnsupportedOperationException(); 
    } 

    @Override 
    public boolean offer(E e) { 
    throw new UnsupportedOperationException(); 
    } 
} 

는, Java 대기열에 익숙하지 않아서 더 좋고/더 안전하고 효율적인 접근 방식 또는 이미 설치된 라이브러리가 있다고 생각했습니다.

+0

'offer', 당신은 단순히 모든 생산자 단일 큐를 손 수없는 이유는 무엇입니까? – teppic

+0

@teppic 왜냐하면 모든 사람이 무시할 수 있지만 모두가 응용 프로그램의 디자인을 변경할 수있는 위치에 있지 않기 때문에 여기에있는 모든 큐는 컬렉션입니다. – Osw

+0

어쩌면 나는 당신이 "오버라이드"란 뜻을 오해하고 있습니다. 생성자 사용자가 큐 인스턴스의 유형 또는 인스턴스화를 제어합니까? – teppic

답변

2

가장 단순한은 서브 대기열을 SynchronousQueue으로 파이프하는 것입니다. 원본 큐 포장과`add`과를 오버라이드 (override) 할 수있는 위치에 있다면

public static void main(String[] args) throws InterruptedException { 

    // Input queues 
    LinkedBlockingDeque<String> q1 = new LinkedBlockingDeque<>(); 
    LinkedBlockingDeque<String> q2 = new LinkedBlockingDeque<>(); 
    LinkedBlockingDeque<String> q3 = new LinkedBlockingDeque<>(); 
    List<LinkedBlockingDeque<String>> qs = Arrays.asList(q1, q2, q3); 

    // Output queue 
    SynchronousQueue<String> combined = new SynchronousQueue<>(true); 

    // Pipe logic 
    Executor executor = Executors.newCachedThreadPool(r -> { 
     Thread t = new Thread(r, "q pipe"); 
     t.setDaemon(true); 
     return t; 
    }); 

    for (LinkedBlockingDeque<String> q : qs) { 
     executor.execute(() -> { 
      try { 
       while (!Thread.currentThread().isInterrupted()) { 
        combined.put(q.take()); 
       } 
      } catch (InterruptedException e) { 
       // done 
      } 
     }); 
    } 

    // Test 
    q1.put("foo"); 
    q2.put("bar"); 
    q3.put("baz"); 

    String e; 
    while ((e = combined.poll(100, TimeUnit.MILLISECONDS)) != null) { 
     System.out.println(e); 
    } 
} 
+0

SynchronousQueue에 대한 아이디어를 제공해 주셔서 감사합니다. 그러나 executor를 사용하는 것은 필자의 경우에는 가능한 모든 리소스를 사용하거나 공유 집행자의 경우 성능이 좋지 않을 수 있기 때문에 피하려고 노력했다. 미안하지만, 수십개의 유니언 인스턴스를 각각 수십 개의 소스 큐로 언급해야했습니다. – Osw