Logstash를 통해 셀러리 작업을 트리거하는 방법을 설명 할 수 있습니까? 가능합니까?Logstash - RabbitMQ를 통해 샐러리 작업을 실행하는 방법
나는 'PHP-amqplib'라이브러리를 통해 PHP에서 그렇게하려고 그것을 잘 작동합니다 경우 : 셀러리 문서에 따르면 (Logstash를 사용하지 않고)
$connection = new AMQPStreamConnection(
'rabbitmq.local',
5672,
'guest',
'guest'
);
$channel = $connection->channel();
$channel->queue_declare(
'celery',
false,
true,
false,
false
);
$taskId = rand(1000, 10000);
$props = array(
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
);
$body = array(
'task' => 'process_next_task',
'lang' => 'py',
'args' => array('ktest' => 'vtest'),
'kwargs' => array('ktest' => 'vtest'),
'origin' => '@'.'mytest',
'id' => $taskId,
);
$msg = new AMQPMessage(json_encode($body), $props);
$channel->basic_publish($msg, 'celery', 'celery');
:
http://docs.celeryproject.org/en/latest/internals/protocol.html
json 형식의 요청을 보내려고합니다.이 필터는 Logstash 필터입니다.
ruby
{
remove_field => ['headers', '@timestamp', '@version', 'host', 'type']
code => "
event.set('properties',
{
:content_type => 'application/json',
:content_encoding => 'utf-8'
})
"
}
그리고 셀러리 대답은 : 기본적으로
[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}}
, 셀러리 내 메시지 형식 또는 디코딩 할 수 없습니다 더 나은 ... 내가 JSON 형식 :
그것은이다의 요청을 설정할 수 아니에요 날 미치게 어떤 단서가
그것을 잊으 :) 사전에 감사합니다,이 정보 제공하기에서 Logstash 내 출력 플러그인rabbitmq
{
key => "celery"
exchange => "celery"
exchange_type => "direct"
user => "${RABBITMQ_USER}"
password => "${RABBITMQ_PASSWORD}"
host => "${RABBITMQ_HOST}"
port => "${RABBITMQ_PORT}"
durable => true
persistent => true
codec => json
}