2014-11-10 3 views
3
나는 다음과 같은 사용 사례가

의 수보다 훨씬 높다 :봄 원자로 혜택 게시자 스레드 수는 소비자

  • N 스레드 (N 10 1000 스레드에 이르기까지 다양) 데이터를 게시를, 그 스레드를 만들 수 있습니다 HTTP 요청, jdbc 호출, 로컬 자바 컴퓨터 만 사용하는 순수 자바 처리
  • 1 to M 스레드가 IO (HTTP 요청 보내기, 대량 쓰기 가능)를 사용하면 해당 스레드가 게시자의 속도를 저하 시켜서는 안됩니다. M은 10 개를 초과해서는 안됩니다.

N 스레드가 소비자가 소비하는 것보다 훨씬 빨리 데이터를 게시 할 수 있지만 게시자의 속도 저하를 최소화하는 것이 좋습니다.

게시자가 작성하고 대기열의 데이터를 가져 와서 처리하는 ArrayBlockingQueue를 기반으로 한 접근 방식을 구현했지만 결과는 좋지 않습니다.

그래서 나는 반응기 패턴, 특히 스프링 반응기를 연구하여 나의 유스 케이스에 대한 반응인지 알아 봅니다. 그럴까요?

내가 읽어

이 필요하십니까?

답변

4

는 소리가 난다. 정상적인 Queue 구현이지만 지속성, 장애 극복 및 재생 기능을 위해 Chronicle Queue를 사용합니다. 또한 매우 빠르고 매우 빠릅니다.

기본적으로 게시자는 한 쪽에서 PersistentQueue로 데이터를 밀어 넣고 다른 쪽에서는 끌어 당기는 구독자 집합을 갖게됩니다. 이미 Queue을 사용하고 있다면 현재 사용중인 것을 대신 할 수 있습니다.

기본 사용 패턴을 보여주기 위해 위키 페이지를 작성해야합니다.

+0

답장을 보내 주셔서 감사합니다. 예, 위키 페이지가 훌륭하다고 생각합니다. 라이브러리의 새로운 사용자로서 유망한 것으로 들리 겠지만 제 견해로는 현재 문서가 부족합니다. 일부 개념은이 필드의 초보자를 위해 설명되어야하며 일부는 스레드로부터 안전하고 그렇지 않은 부분에 중점을 둡니다. – pmpm

1

사용자 지정 컨테이너 클래스를 사용하여 유사한 문제를 처리했습니다. 누적 된 모든 객체를 하나의 잠금없는 동작으로 읽을 수있는 CAS 객체를 통해 이중 버퍼링 방법을 사용합니다.

나는 그것이 얼마나 효율적인지 전혀 모르지만 단순함은 좋은 것들과 함께 거기에 있도록해야합니다.

아래 코드의 대부분은 테스트 코드이므로 기능에 영향을주지 않고 //TESTING 주석 아래의 모든 코드를 제거 할 수 있습니다. 당신이 원자로의 PersistentQueue facility을보고 그에서 구독자에서 게시자를 분리 할 수 ​​있습니다처럼

/** 
* Lock free - thread-safe. 
* 
* Write from many threads - read with fewer threads. 
* 
* Write items of type T. 
* 
* Read items of type List<T>. 
* 
* @author OldCurmudgeon 
* @param <T> - Th etype we plan to write/read. 
*/ 
public class DoubleBufferedList<T> { 

    /** 
    * Atomic reference so I can atomically swap it through. 
    * 
    * Mark = true means I am adding to it so momentarily unavailable for iteration. 
    */ 
    private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false); 

    // Factory method to create a new list - may be best to abstract this. 
    protected List<T> newList() { 
     return new ArrayList<>(); 
    } 

    /** 
    * Get and replace the current list. 
    * 
    * Used by readers. 
    * 
    * @return List<T> of a number (possibly 0) of items of type T. 
    */ 
    public List<T> get() { 
     // The list that was there. 
     List<T> it; 
     // Replace an unmarked list with an empty one. 
     if (!list.compareAndSet(it = list.getReference(), newList(), false, false)) { 
      // Mark was not false - Failed to replace! 
      // It is probably marked as being appended to but may have been replaced by another thread. 
      // Return empty and come back again soon. 
      return Collections.<T>emptyList(); 
     } 
     // Successfull replaced an unmarked list with an empty list! 
     return it; 
    } 

    /** 
    * Grab and lock the list in preparation for append. 
    * 
    * Used by add. 
    */ 
    private List<T> grab() { 
     List<T> it; 
     // We cannot fail so spin on get and mark. 
     while (!list.compareAndSet(it = list.getReference(), it, false, true)) { 
      // Spin on mark - waiting for another grabber to release (which it must). 
     } 
     return it; 
    } 

    /** 
    * Release the grabbed list. 
    * 
    * Opposite of grab. 
    */ 
    private void release(List<T> it) { 
     // Unmark it - should this be a compareAndSet(it, it, true, false)? 
     if (!list.attemptMark(it, false)) { 
      // Should never fail because once marked it will not be replaced. 
      throw new IllegalMonitorStateException("It changed while we were adding to it!"); 
     } 
    } 

    /** 
    * Add an entry to the list. 
    * 
    * Used by writers. 
    * 
    * @param entry - The new entry to add. 
    */ 
    public void add(T entry) { 
     List<T> it = grab(); 
     try { 
      // Successfully marked! Add my new entry. 
      it.add(entry); 
     } finally { 
      // Always release after a grab. 
      release(it); 
     } 
    } 

    /** 
    * Add many entries to the list. 
    * 
    * @param entries - The new entries to add. 
    */ 
    public void add(List<T> entries) { 
     List<T> it = grab(); 
     try { 
      // Successfully marked! Add my new entries. 
      it.addAll(entries); 
     } finally { 
      // Always release after a grab. 
      release(it); 
     } 
    } 

    /** 
    * Add a number of entries. 
    * 
    * @param entries - The new entries to add. 
    */ 
    @SafeVarargs 
    public final void add(T... entries) { 
     // Make a list of them. 
     add(Arrays.<T>asList(entries)); 
    } 

    // TESTING. 
    // How many testers to run. 
    static final int N = 10; 
    // The next one we're waiting for. 
    static final AtomicInteger[] seen = new AtomicInteger[N]; 
    // The ones that arrived out of order. 
    static final ConcurrentSkipListSet<Widget>[] queued = Generics.<ConcurrentSkipListSet<Widget>>newArray(N); 

    static class Generics { 

     // A new Generics method for when we switch to Java 7. 
     @SafeVarargs 
     static <E> E[] newArray(int length, E... array) { 
      return Arrays16.copyOf(array, length); 
     } 
    } 

    static { 
     // Populate the arrays. 
     for (int i = 0; i < N; i++) { 
      seen[i] = new AtomicInteger(); 
      queued[i] = new ConcurrentSkipListSet<>(); 
     } 
    } 

    // Thing that is produced and consumed. 
    private static class Widget implements Comparable<Widget> { 

     // Who produced it. 
     public final int producer; 
     // Its sequence number. 
     public final int sequence; 

     public Widget(int producer, int sequence) { 
      this.producer = producer; 
      this.sequence = sequence; 
     } 

     @Override 
     public String toString() { 
      return producer + "\t" + sequence; 
     } 

     @Override 
     public int compareTo(Widget o) { 
      // Sort on producer 
      int diff = Integer.compare(producer, o.producer); 
      if (diff == 0) { 
       // And then sequence 
       diff = Integer.compare(sequence, o.sequence); 
      } 
      return diff; 
     } 
    } 

    // Produces Widgets and feeds them to the supplied DoubleBufferedList. 
    private static class TestProducer implements Runnable { 

     // The list to feed. 
     final DoubleBufferedList<Widget> list; 
     // My ID 
     final int id; 
     // The sequence we're at 
     int sequence = 0; 
     // Set this at true to stop me. 
     public volatile boolean stop = false; 

     public TestProducer(DoubleBufferedList<Widget> list, int id) { 
      this.list = list; 
      this.id = id; 
     } 

     @Override 
     public void run() { 
      // Just pump the list. 
      while (!stop) { 
       list.add(new Widget(id, sequence++)); 
      } 
     } 
    } 

    // Consumes Widgets from the suplied DoubleBufferedList 
    private static class TestConsumer implements Runnable { 

     // The list to bleed. 
     final DoubleBufferedList<Widget> list; 
     // My ID 
     final int id; 
     // Set this at true to stop me. 
     public volatile boolean stop = false; 

     public TestConsumer(DoubleBufferedList<Widget> list, int id) { 
      this.list = list; 
      this.id = id; 
     } 

     @Override 
     public void run() { 
      // The list I am working on. 
      List<Widget> l = list.get(); 
      // Stop when stop == true && list is empty 
      while (!(stop && l.isEmpty())) { 
       // Record all items in list as arrived. 
       arrived(l); 
       // Grab another list. 
       l = list.get(); 
      } 
     } 

     private void arrived(List<Widget> l) { 
      for (Widget w : l) { 
       // Mark each one as arrived. 
       arrived(w); 
      } 
     } 

     // A Widget has arrived. 
     private static void arrived(Widget w) { 
      // Which one is it? 
      AtomicInteger n = seen[w.producer]; 
      // Don't allow multi-access to the same producer data or we'll end up confused. 
      synchronized (n) { 
       // Is it the next to be seen? 
       if (n.compareAndSet(w.sequence, w.sequence + 1)) { 
        // It was the one we were waiting for! See if any of the ones in the queue can now be consumed. 
        for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) { 
         Widget it = i.next(); 
         // Is it in sequence? 
         if (n.compareAndSet(it.sequence, it.sequence + 1)) { 
          // Done with that one too now! 
          i.remove(); 
         } else { 
          // Found a gap! Stop now. 
          break; 
         } 
        } 
       } else { 
        // Out of sequence - Queue it. 
        queued[w.producer].add(w); 
       } 
      } 
     } 
    } 

    // Main tester 
    public static void main(String args[]) { 
     try { 
      System.out.println("DoubleBufferedList:Test"); 
      // Create my test buffer. 
      DoubleBufferedList<Widget> list = new DoubleBufferedList<>(); 
      // All running threads - Producers then Consumers. 
      List<Thread> running = new LinkedList<>(); 
      // Start some producer tests. 
      List<TestProducer> producers = new ArrayList<>(); 
      for (int i = 0; i < N; i++) { 
       TestProducer producer = new TestProducer(list, i); 
       Thread t = new Thread(producer); 
       t.setName("Producer " + i); 
       t.start(); 
       producers.add(producer); 
       running.add(t); 
      } 

      // Start the same number of consumers (could do less or more if we wanted to). 
      List<TestConsumer> consumers = new ArrayList<>(); 
      for (int i = 0; i < N; i++) { 
       TestConsumer consumer = new TestConsumer(list, i); 
       Thread t = new Thread(consumer); 
       t.setName("Consumer " + i); 
       t.start(); 
       consumers.add(consumer); 
       running.add(t); 
      } 
      // Wait for a while. 
      Thread.sleep(5000); 
      // Close down all. 
      for (TestProducer p : producers) { 
       p.stop = true; 
      } 
      for (TestConsumer c : consumers) { 
       c.stop = true; 
      } 
      // Wait for all to stop. 
      for (Thread t : running) { 
       System.out.println("Joining " + t.getName()); 
       t.join(); 
      } 
      // What results did we get? 
      int totalMessages = 0; 
      for (int i = 0; i < N; i++) { 
       // How far did the producer get? 
       int gotTo = producers.get(i).sequence; 
       // The consumer's state 
       int seenTo = seen[i].get(); 
       totalMessages += seenTo; 
       Set<Widget> queue = queued[i]; 
       if (seenTo == gotTo && queue.isEmpty()) { 
        System.out.println("Producer " + i + " ok."); 
       } else { 
        // Different set consumed as produced! 
        System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue); 
       } 
      } 
      System.out.println("Total messages " + totalMessages); 

     } catch (InterruptedException ex) { 
      ex.printStackTrace(); 
     } 
    } 
}