0
rabbitmq에서 메시지를 읽고 oracle 데이터베이스에 씁니다. 나는 스프링 부트 배치를 사용하여 메시지를 읽었지만 "큐가 지정되지 않았습니다."라는 오류로 끝납니다 .RabbitTemplate의 구성을 확인하십시오. "org.springframework.amqp.AmqpIllegalStateException : '대기열'이 지정되지 않았습니다. RabbitTemplate 구성 확인
RabbitConfig.java
@Configuration
public class RabbitMQConfig {
// @Value("${conveh.rabbitmq.queue}")
public String queueName ="hello2";
// @Value("${conveh.rabbitmq.exchange}")
public String exchange="hello_exchage2";
@Bean
public Queue queue() {
return new Queue(queueName, false);
}
@Bean
public TopicExchange exchange() {
return new TopicExchange(exchange);
}
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setQueue(queueName);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("host");
connectionFactory.setPort(123);
connectionFactory.setVirtualHost("/xxx");
connectionFactory.setUsername("xxx");
connectionFactory.setPassword("xxx");
return connectionFactory;
}
}
BatchConfig.java 내가 제대로 큐의 이름을 지정 실종 무엇
@Configuration
@EnableBatchProcessing
public class BatchMqListener {
// private final Logger logger = LoggerFactory.getLogger(ImportJobConfig.class);
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public Job importJob() {
return jobBuilderFactory.get("importJob")
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
// logger.info("Ready to start the job");
System.out.println("Ready to start the job");
}
@Override
public void afterJob(JobExecution jobExecution) {
// logger.info("Job successfully executed.");
System.out.println("Job successfully executed.");
}
})
.incrementer(new RunIdIncrementer())
.flow(stepBuilderFactory.get("importStep")
.<VehicleEvent, VehicleEvent>chunk(2)
.reader(new AmqpItemReader<>(rabbitTemplate))
.listener(new QueueListener<VehicleEvent>())
.processor(customProcessor())
.writer(writer())
.build())
.end()
.build();
}
public ItemProcessor<VehicleEvent, VehicleEvent> customProcessor(){
return null;
}
@Bean
public FlatFileItemWriter<VehicleEvent> writer() {
// log.info("writer called");
FlatFileItemWriter<VehicleEvent> writer = new FlatFileItemWriter<>();
writer.setResource(new FileSystemResource("output/item.all.csv"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<VehicleEvent>() {{
setDelimiter(",");
setFieldExtractor(new BeanWrapperFieldExtractor<VehicleEvent>() {{
setNames(new String[]{"id", "itemName"});
}});
}});
return writer;
}
}
.