2012-10-04 5 views
5

작은 Clojure 소비자/게시자가 메시지를 받고이를 처리하고 다른 소비자에게 RabbitMQ를 통해 메시지를 보냅니다.Clojure 메시지 처리/비동기, 다중 스레드

나는 메인 스레드 인 별도의 스레드에서 메시지를 처리하는 메시지 처리기를 정의했습니다. 아래 코드에서 알 수 있듯이 스레드는 메시지를 동시에 수신하고 전송하며 lcm/subscribe 함수에 의해 시작된 이벤트 루프에서 모두 발생합니다.

그래서, 이러한 동기식 메시지 처리기의 N 크기 스레드 풀을 만드는 "Clojure way"는 무엇이 될까요? Clojure가 아닌 방법은 Java interop을 통해 수 개의 스레드를 수동으로 생성하는 것입니다.

또한 처리 속도가 CPU를 많이 사용하지 않는다는 것을 고려하면 메시지 처리 속도가 빨라지겠습니까? 이러한 메시지 처리기를 비동기로 만드는 것이 좋습니다. 처리보다 게시에 더 많은 시간을 소비하는 것을 고려하면 다시 생각해보십시오.

마지막으로, 이러한 경쟁 방식 (Ruby/Javascript 세계에서 왔으며 거기에 멀티 스레딩이 없음)의 성능을 측정하려면 어떻게해야합니까?

참고 : 나는이 모든 단지 수평으로 확장하고, 메시지 버스를 듣고 더 JVM 프로세스를 산란에 의해 피할 수 있습니다 알고 있지만, 앱을 Heroku가에 배포 할 것입니다 때문에, 내가 같이 사용하고 싶습니다 가능한 한 많은 리소스를 각 dyno/프로세스에서 사용할 수 있습니다.

더 기본적인 노트에
(defn message-handler 
    [ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message))) 

(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
         (lcm/subscribe ch q-name message-handler :auto-ack true)))))) 

...이처럼 추가적인 인수와 함께 메시지 핸들러 콜백을 등록 지원하기 위해이 코드를 리팩토링에 대해 갈 것입니다 방법 :

(.start (Thread. (fn [] 
         (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true)))))) 

및 다음을 참조하여 배포합니다 : 대신 리터럴의

(lb/publish ch pub-name "" processed-message))) 

을 :

(lb/publish ch "e.events" "" processed-message))) 

답변

2

질문의 두 번째 부분의 경우, 일부 응용 프로그램을 사용할 수 있습니다 : 팁을위한

(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message))) 



(.start 
    (Thread. 
    (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 
1

이것은 매우 큰 주제이므로이 질문을 여러 가지 질문으로 구분할 수는 있지만 간결한 대답은 use agents입니다. 아래 그림과 같이

+0

감사를 할 것입니다. – neektza