작은 Clojure 소비자/게시자가 메시지를 받고이를 처리하고 다른 소비자에게 RabbitMQ를 통해 메시지를 보냅니다.Clojure 메시지 처리/비동기, 다중 스레드
나는 메인 스레드 인 별도의 스레드에서 메시지를 처리하는 메시지 처리기를 정의했습니다. 아래 코드에서 알 수 있듯이 스레드는 메시지를 동시에 수신하고 전송하며 lcm/subscribe 함수에 의해 시작된 이벤트 루프에서 모두 발생합니다.
그래서, 이러한 동기식 메시지 처리기의 N 크기 스레드 풀을 만드는 "Clojure way"는 무엇이 될까요? Clojure가 아닌 방법은 Java interop을 통해 수 개의 스레드를 수동으로 생성하는 것입니다.
또한 처리 속도가 CPU를 많이 사용하지 않는다는 것을 고려하면 메시지 처리 속도가 빨라지겠습니까? 이러한 메시지 처리기를 비동기로 만드는 것이 좋습니다. 처리보다 게시에 더 많은 시간을 소비하는 것을 고려하면 다시 생각해보십시오.
마지막으로, 이러한 경쟁 방식 (Ruby/Javascript 세계에서 왔으며 거기에 멀티 스레딩이 없음)의 성능을 측정하려면 어떻게해야합니까?
참고 : 나는이 모든 단지 수평으로 확장하고, 메시지 버스를 듣고 더 JVM 프로세스를 산란에 의해 피할 수 있습니다 알고 있지만, 앱을 Heroku가에 배포 할 것입니다 때문에, 내가 같이 사용하고 싶습니다 가능한 한 많은 리소스를 각 dyno/프로세스에서 사용할 수 있습니다.
더 기본적인 노트에(defn message-handler
[ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
processed-message (process msg)]
(lb/publish ch "e.events" "" processed-message)))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name message-handler :auto-ack true))))))
...이처럼 추가적인 인수와 함께 메시지 핸들러 콜백을 등록 지원하기 위해이 코드를 리팩토링에 대해 갈 것입니다 방법 :
(.start (Thread. (fn []
(lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))
및 다음을 참조하여 배포합니다 : 대신 리터럴의
(lb/publish ch pub-name "" processed-message)))
을 :
(lb/publish ch "e.events" "" processed-message)))
감사를 할 것입니다. – neektza