2017-04-21 4 views
0

spring-cloud-stream-binder-ibm-mq을 사용하여 MQueue에서 스프링 부트 스트림을 사용하는 방법을 알아 내려고하고 있습니다. 나는 MQueue에 연결할 수 있지만 Could not provision topic 'queue///EMB_DEV_QUEUE'MQJE001: Completion Code '2', Reason '2035'을 얻는다. 관리자가 큐가 아니라 주제인지 확인했습니다.스프링 스트림이 IBM MQueue에서 큐가 아닌 토픽을 만들려고 시도합니다

simplest-sample-applications-using-websphere-mq-jms을 기반으로하는 MQQueueConnectionFactory을 사용하는 일부 샘플 코드를 사용하여 연결할 수 있으므로 MQueue가 작동 중임을 알고 있습니다.

여기 내 프로그램입니다. 나는 성공과 함께 카프카에게 동일한 패턴을 사용했다.

@EnableBinding({Sink.class, Source.class}) 
@SpringBootApplication 
public class MQueueStreamApplication { 

    private final static AtomicInteger counter = new AtomicInteger(); 
    private final  Logger  logger = LoggerFactory.getLogger(getClass()); 

    public static void main(String[] args) { 
     SpringApplication.run(MQueueStreamApplication.class, args); 
    } 

    @Bean 
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "2000")) 
    public MessageSource<String> timeSource() { 
     return() -> { 
      String message = String.format("Timed Message %d", counter.incrementAndGet()); 

      logger.info("Producing Message: {}", message); 

      return MessageBuilder.withPayload(message).setHeader("Message-Type", "mqueue-stream").build(); 
     }; 
    } 

    @ServiceActivator(inputChannel = Sink.INPUT) 
    public void serviceSink(Message<String> message) { 
     String payload = String.valueOf(message.getPayload()); 

     logger.info("Received Message: {} [{}]", payload, message.getHeaders()); 
    } 

} 

여기가 내 application.yml입니다. 나는 queue:/// 접두사가있는 경우와없는 경우 시도했다. 샘플 프로그램은 접 두부와 함 2 작동합니다.

spring: 
    cloud: 
    stream: 
     bindings: 
     input: 
      destination: queue:///EMB_DEV_QUEUE 
      group: mqueue-stream 
#   binder: ibmmq 
     output: 
      destination: queue:///EMB_DEV_QUEUE 

ibmmq: 
    host: vm-dev-q01.corp.int 
    port: 1414 
    channel: EMB_DEV_CHANNEL 
    queueManager: EMB_DEV_QMGR 

여기가 내 지형도 작성 점입니다.

buildscript { 
    ext { 
     springBootVersion = '1.5.3.RELEASE' 
    } 
    repositories { 
     mavenCentral() 
    } 
    dependencies { 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") 
    } 
} 

apply plugin: 'java' 
apply plugin: 'eclipse' 
apply plugin: 'idea' 
apply plugin: 'org.springframework.boot' 

version = '0.0.1-SNAPSHOT' 

sourceCompatibility = 1.8 
targetCompatibility = 1.8 

repositories { 
    mavenLocal() 
    mavenCentral() 
} 


dependencies { 
    compile('org.springframework.boot:spring-boot-starter-actuator') 

    compile('org.springframework.cloud:spring-cloud-stream') 
    compile('org.springframework.cloud:spring-cloud-stream-binder-jms-ibm-mq:1.0.0.BUILD-SNAPSHOT') 

    testCompile('org.springframework.boot:spring-boot-starter-test') 
} 

dependencyManagement { 
    imports { 
     mavenBom "org.springframework.cloud:spring-cloud-dependencies:Dalston.RELEASE" 
    } 
} 

나는 지침에 따라 spring-cloud-stream-binder-ibm-mq을 만들었습니다. MQueue 설치에서 필요한 두 개의 jar 파일이 있습니다. 매니페스트는 버전 9.0.0.0을 사용 했으므로 pom.xml에서

나는 새로운 것으로, Streams 사용 경험이 제한적입니다. 나는 카프카에 성공적으로 연결할 수있었습니다. 나는 어떤 도움을 주셔서 감사합니다.

웨스.

답변

0

스프링 클라우드 스트림은 소비자 그룹 및 파티셔닝과 같은 기능을 구현할 수 있도록 일반 JMS/IBM-MQ 애플리케이션보다 철저한 인프라를 사용합니다.이 경우 대상은 주제입니다. 자세한 내용은 https://github.com/spring-cloud/spring-cloud-stream-binder-ibm-mq#how-it-works을 참조하십시오.

+0

나는 몇 번이나 여전히 확신 할 수 없다는 것을 읽었다. 아마도 MQ에 대한 나의 경험 부족. IBM docs는 Topic이 레이블 일 뿐이지 만 topic.queue 규칙을 사용하여 Topic에서 Topics와 Queues에 액세스하는 것으로 보이는 코드를 단계적으로 진행 함을 의미합니다. 그것은 마치 구조처럼 들리거나 서브 스크립 션이라고 할 수 있습니까? 내 목표는 처리 할 수없는 경우 메시지를 반환하는 소비자를 갖는 것입니다. 처음에 나는 Client_Ack를 생각했지만 지금은 2 개의 목적지를 생각하고 있습니다. 재 시도가 실패한 경우 재시도 및 기타 재 시도가 필요합니다. 그들은 주제가되어야합니까? 대기열이 같은 대기열로 재 시도하려고합니다. 다른 모습이 아닙니다. 고마워 – Wes