spring-cloud-stream-binder-ibm-mq을 사용하여 MQueue에서 스프링 부트 스트림을 사용하는 방법을 알아 내려고하고 있습니다. 나는 MQueue에 연결할 수 있지만 Could not provision topic 'queue///EMB_DEV_QUEUE'
과 MQJE001: Completion Code '2', Reason '2035'
을 얻는다. 관리자가 큐가 아니라 주제인지 확인했습니다.스프링 스트림이 IBM MQueue에서 큐가 아닌 토픽을 만들려고 시도합니다
simplest-sample-applications-using-websphere-mq-jms을 기반으로하는 MQQueueConnectionFactory
을 사용하는 일부 샘플 코드를 사용하여 연결할 수 있으므로 MQueue가 작동 중임을 알고 있습니다.
여기 내 프로그램입니다. 나는 성공과 함께 카프카에게 동일한 패턴을 사용했다.
@EnableBinding({Sink.class, Source.class})
@SpringBootApplication
public class MQueueStreamApplication {
private final static AtomicInteger counter = new AtomicInteger();
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(MQueueStreamApplication.class, args);
}
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000"))
public MessageSource<String> timeSource() {
return() -> {
String message = String.format("Timed Message %d", counter.incrementAndGet());
logger.info("Producing Message: {}", message);
return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build();
};
}
@ServiceActivator(inputChannel = Sink.INPUT)
public void serviceSink(Message<String> message) {
String payload = String.valueOf(message.getPayload());
logger.info("Received Message: {} [{}]", payload, message.getHeaders());
}
}
여기가 내 application.yml
입니다. 나는 queue:///
접두사가있는 경우와없는 경우 시도했다. 샘플 프로그램은 접 두부와 함 2 작동합니다.
spring:
cloud:
stream:
bindings:
input:
destination: queue:///EMB_DEV_QUEUE
group: mqueue-stream
# binder: ibmmq
output:
destination: queue:///EMB_DEV_QUEUE
ibmmq:
host: vm-dev-q01.corp.int
port: 1414
channel: EMB_DEV_CHANNEL
queueManager: EMB_DEV_QMGR
여기가 내 지형도 작성 점입니다.
buildscript {
ext {
springBootVersion = '1.5.3.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
compile('org.springframework.boot:spring-boot-starter-actuator')
compile('org.springframework.cloud:spring-cloud-stream')
compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT')
testCompile('org.springframework.boot:spring-boot-starter-test')
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE"
}
}
나는 지침에 따라
spring-cloud-stream-binder-ibm-mq
을 만들었습니다. MQueue 설치에서 필요한 두 개의 jar 파일이 있습니다. 매니페스트는 버전
9.0.0.0
을 사용 했으므로
pom.xml
에서
나는 새로운 것으로, Streams 사용 경험이 제한적입니다. 나는 카프카에 성공적으로 연결할 수있었습니다. 나는 어떤 도움을 주셔서 감사합니다.
웨스.
나는 몇 번이나 여전히 확신 할 수 없다는 것을 읽었다. 아마도 MQ에 대한 나의 경험 부족. IBM docs는 Topic이 레이블 일 뿐이지 만 topic.queue 규칙을 사용하여 Topic에서 Topics와 Queues에 액세스하는 것으로 보이는 코드를 단계적으로 진행 함을 의미합니다. 그것은 마치 구조처럼 들리거나 서브 스크립 션이라고 할 수 있습니까? 내 목표는 처리 할 수없는 경우 메시지를 반환하는 소비자를 갖는 것입니다. 처음에 나는 Client_Ack를 생각했지만 지금은 2 개의 목적지를 생각하고 있습니다. 재 시도가 실패한 경우 재시도 및 기타 재 시도가 필요합니다. 그들은 주제가되어야합니까? 대기열이 같은 대기열로 재 시도하려고합니다. 다른 모습이 아닙니다. 고마워 – Wes