2012-04-20 3 views
1

저는 현재 레일스 프로세스와 여러 이벤트 (객체 생성과 같은)에 대해 통보해야하는 작업자 프로세스가있는 아키텍처를 만들고 있습니다.루비 메시지 버스 젬이 있습니까?

Rails 
    |   API Worker 
    +----------o------o--------o------ - - - 
         Some other 
          daemon 

나는 프로세스 (API가, 노동자, 데몬, ...) 바로 메시지 버스에 가입 블록이 때 메시지를 호출하는 동안

class Article 
    after_creation do 
    MessageBus.send type: "article-created", id: self.id 
    end 
end 

다음을 수행하고 싶습니다 제공됩니다.

MessageBus.subscribe do |msg| 
    if msg['type'] == 'article-created' 
    # if this is my websocket daemon, I would push the article to the browser 
    # if this is my indexing daemon, I would reindex the full-text search 
    # if this is ... you get the deal. 
    end 
end 

을 현재 내가 UNIXSocket과에서 JSON을 누르고 EventMachine.start_unix_domain_server으로 그것을 얻을 로컬 유닉스 도메인 소켓을 사용하고 있습니다. 그러나 그것은 양방향 통신 만 허용합니다. 나는 또한 resque 사용에 대해 생각했다. 그러나 버스가 필요한 동안 이것은 메시지 대기열이다. 그리고 그것은 redis에 달려 있습니다. 루비에서 메시지 버스를 구현하는 보석이 있어야한다고 확신하지만, 검색 결과가 전혀 나오지 않았습니다.

+2

EventMachine 채널을 사용해 보셨습니까? EventMachine 채널의 경우 http://eventmachine.rubyforge.org/EventMachine/Channel.html – tommasop

+0

+1. (Workers가 마스터와 동일한 프로세스 공간에서 실행되는 경우) 고려해야 할 또 다른 사항은 Ruby가 Observer 패턴을 기본적으로 지원한다는 것입니다. [Observable] (http://www.ruby-doc.org/stdlib-1.9) .3/libdoc/observer/rdoc/Observable.html) –

+2

참조 : https://github.com/SamSaffron/message_bus –

답변

1

마지막으로 Eventmachine Channels를 사용하여 간단한 솔루션을 해킹했습니다.

이것은 내 서버입니다. 기본적으로 클라이언트는 /tmp/messagebus.sock에 연결하여 데이터를 보냅니다. 소켓에 푸시 된 모든 것은 다른 모든 클라이언트로 보내집니다.

require 'rubygems' 
require 'eventmachine' 

module Messagebus 
    class Server 
    attr_accessor :connections 
    attr_accessor :channel 

    def initialize 
     @connections = [] 
     @channel = EventMachine::Channel.new 
    end 

    def start 
     @signature = EventMachine.start_unix_domain_server '/tmp/messagebus.sock', Connection do |conn| 
     conn.server = self 
     end 
    end 

    def stop 
     EventMachine.stop_server(@signature) 

     unless wait_for_connections_and_stop 
     EventMachine.add_periodic_timer(1) { wait_for_connections_and_stop } 
     end 
    end 

    def wait_for_connections_and_stop 
     if @connections.empty? 
     EventMachine.stop 
     true 
     else 
     puts "Waiting for #{@connections.size} connection(s) to finish ..." 
     false 
     end 
    end 
    end 

    class Connection < EventMachine::Connection 
    attr_accessor :server 

    def post_init 
     log "Connection opened" 
    end 

    def server=(server) 
     @server = server 

     @subscription = server.channel.subscribe do |data| 
     self.log "Sending #{data}" 
     self.send_data data 
     end 
    end 

    def receive_data(data) 
     log "Received #{data}" 
     server.channel.push data 
    end 

    def unbind 
     server.channel.unsubscribe @subscription 
     server.connections.delete(self) 
     log "Connection closed" 
    end 

    def log(msg) 
     puts "[#{self.object_id}] #{msg}" 
    end 
    end 
end 

EventMachine::run { 
    s = Messagebus::Server.new 
    s.start 
    puts "New server listening" 
}