2017-04-06 2 views
0

http://cloud.spring.io/spring-cloud-dataflow/에 설명 된 간단한 예제를 기반으로 "Hello, world"스프링 클라우드 데이터 흐름을 실행하려고합니다. 간단한 소스를 만들고 싱크대를 만들고 카프카를 사용하여 로컬 SCDF 서버에서 실행할 수 있습니다. 여기까지 모든 것이 정확합니다.K8에서 배포하지만 로컬 배포에서 작업 할 때 스트림> 소스에서 UNKNOWN_TOPIC_OR_PARTITION을 발생시킵니다.

이제는 http://docs.spring.io/spring-cloud-dataflow-server-kubernetes/docs/current-SNAPSHOT/reference/htmlsingle/#_getting_started에 나열된 지침에 따라 개인용 클라우드에 배포하려고합니다. 이 배포를 사용하면 간단한 "시간 | 로그"를 즉시 사용할 수있는 스트림을 문제없이 배포 할 수 있지만 예제가 실패합니다.

특정 버전은 다음과 같습니다

  • 도커 버전 1.13.1, 092cba3
  • Hyperkube 1.5.5
  • SCDF 1.2.0.M2
  • 를 구축 사육사 3.4.9-1757313에 내장 2016년 8월 23일 그리니치 표준시 06시 50분
  • 카프카 0.10.1.1

소스 이슈 로그 위치 :

2017-04-06T11 : 05 : 07.429204866Z 2017년 4월 6일 11 : 05 : 07,428 INFO 주 SendThread (10.0.0.181:2181) oazClientCnxn : 876 - 소켓 연결 10.0 세웠다. 0.181/10.0.0.181 : 2181, 시작 세션 2017-04-06T11 : 05 : 07.440381666Z 2017-04-06 11 : 05 : 07,439 정보 main-SendThread (10.0.0.181:2181) oazClientCnxn : 1299 - 세션 설정 완료 서버 10.0.0.181/10.0.0.181:2181, sessionid = 0x15b155ef61e014a, 협상 된 시간 초과 = 10000 2017-04-06T11 : 05 : 07.740130495Z 2017-04-06 11 : 05 : 07,737 정보 oakcpProducerConfig : 180 - ProducerConfig 값 : 2017-04-06T11 : 05 : 07.740160464Z acks = 1 2017-04-06T11 : 05 : 07.740163408Z batch.size = 16384 2017-04-06T11 : 05 : 07.740165226Z block.on.buffer.full = false 2017-04-06T11 : 05 : 07.740166942Z bootstrap.servers = [10.0.0.213:9092] 2017-04-06T11 : 05 : 07.740168741Z buffer.memory = 33554432 2017-04-06T11 : 05 : 07.740170545Z client.id = 2017-04-06T11 : 05 : 07.740172245Z compression.type = none 2017-04-06T11 : 05 : 07.740173971Z 연결 .max.idle.ms = 540000 2017-04-06T11 : 05 : 07.740175706Z interceptor.classes = null 2017-04-06T11 : 05 : 07.744179899Z reconnect.backoff.ms = 50 2017-04-06T11 : 05 : 07.744181600Z request.timeout.ms = 30000 2017-04-06T11 : 05 : 07.744183356Z 재시도 = 0 2017-04-06T11 : 05 : 07.744185083Z retry.backoff.ms = 100 2017-04-0 6T11 : 05 : 07.744186754Z sasl.kerberos.kinit.cmd =/usr/bin/kinit 2017-04-06T11 : 05 : 07.744188494Z sasl.kerberos.min.time.before.relogin = 60000 2017-04-06T11 : 05 : 07.744190205Z sasl.kerberos.service.name = null 2017-04-06T11 : 05 : 07.744191916Z sasl.kerberos.ticket.renew.jitter = 0.05 2017-04-06T11 : 05 : 07.744193763Z sasl.kerberos .ticket.renew.window.factor = 0.8 2017-04-06T11 : 05 : 07.744195432Z sasl.mechanism = GSSAPI 2017-04-06T11 : 05 : 07.744197163Z security.protocol = PLAINTEXT 2017-04-06T11 : 05 : 07.744198789Z send.buffer.bytes = 131072 2017-04-06T11 : 05 : 07.744200522Z ssl.cipher.suites = null 2017-04-06T11 : 05 : 07.744202328Z ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 2017-04-06T11 : 05 : 07.744204161Z ssl.endpoint.identification.algorithm = null 2017-04-06T11 : 05 : 07.744205837Z ssl.key.password = null 2017-04-06T11 : 05 : 07.744207544Z ssl.keymanager.algorithm = SunX509 2017-04-06T11 : 05 : 07.744212464Z ssl.keystore.location = null 2017-04-06T11 : 05 : 07.744214272Z ssl.keystore.password = null 2017-04-06T11 : 05 : 07.744216025Z ssl.keystore.type = JKS 2017-04-06T11 : 05 : 07.744217647Z ssl.protocol = TLS 2017-04-06T11 : 05 : 07.744219234Z ssl.provider = null 2017-04-06T11 : 05 : 07.744220987Z ssl.secure.random.implementation = null 2017-04-06T11 : 05 : 07.744222666Z ssl.trustmanager.algorithm = PKIX 2017-04- 06T11 : 05 : 07.744224359Z ssl.truststore.location = null 2017-04-06T11 : 05 : 07.744226022Z ssl.truststore.password = null 2017-04-0 6T11 : 05 : 07.744228171Z ssl.truststore.type = JKS 2017-04-06T11 : 05 : 07.744230006Z timeout.ms = 30000 2017-04-06T11 : 05 : 07.744231705Z value.serializer = class org.apache.kafka .common.serialization.ByteArraySerializer 2017-04-06T11 : 05 : 07.744233544Z 2017-04-06T11 : 05 : 07.837193978Z 2017-04-06 11 : 05 : 07,834 WARN 메인 oakcpProducerConfig : 188 - 구성 '키. deserializer '가 제공되었지만 알려진 구성이 아닙니다. 2017-04-06T11 : 05 : 07.837221870Z 2017-04-06 11 : 05 : 07,835 WARN 메인 o.a.k.c.p.ProducerConfig : 188 - 'value.deserializer'구성이 제공되었지만 알려진 구성이 아닙니다. 2017-04-06T11 : 05 : 07.929207703Z 2017-04-06 11 : 05 : 07,926 기본 정보 oakcuAppInfoParser : 83 - 카프카 버전 : 0.10.1.1 2017-04-06T11 : 05 : 07.929239636Z 2017-04-06 11 : 05 : 07,927 기본 정보 oakcuAppInfoParser : 84 - 카프카 커밋 ID : f10ef2720b03b247 2017-04-06T11 : 05 : 08.228817026Z 2017-04-06 11 : 05 : 08,228 경고 kafka-producer-network-thread | producer-1 oakcNetworkClient : 600 - 상관 관계 ID가 0 인 메타 데이터를 가져 오는 동안 오류가 발생했습니다 : {출력 = UNKNOWN_TOPIC_OR_PARTITION} 2017-04-06T11 : 05 : 08.436574800Z 2017-04-06 11 : 05 : 08,435 WARN kafka-producer- 스레드 | 생산자 - 1 oakcNetworkClient : 600 - 오류와 메타 데이터를 가져 오는 동안 상관 ID 1 : {출력 = UNKNOWN_TOPIC_OR_PARTITION}

그리고 Zookepeer 로그는 다음과 같습니다

2017-04-06T11 : 04 : 2017 38.000953447Z 세션 ID : 0x15b155ef61e0148 2017-04-06T11 : 05 : 04.939356606Z (으)로 처리 된 세션 종료 : -04-06 11 : 04 : 38,000 [myid :] - 정보 [ProcessThread (sid : 0 cport : 2181) :: PrepRequestProcessor @ 487] 2017-04-06 11 : 05 : 04,938 [myid :] - 정보 [NIOServerCxn.Factory : 0.0.0.0/0.0.0 : 2181 : NIOServerCnxnFactory @ 192] - /10.1.98.5:48180에서 허용되는 소켓 연결 2017-04 -06T11 : 05 : 04.940666418Z 2017 -04-06 11 : 05 : 04,939 [정보 : NIOServerCxn.Factory : 0.0.0.0/0.0.0.0 : 2181 : ZooKeeperServer @ 928] - /10.1.98.5:48180에 새로운 세션을 설정하려는 클라이언트 2017 -04-06T11 : 05 : 04.943859474Z 2017-04-06 11 : 05 : 04,943 [정보 : SyncThread : 0 : ZooKeeperServer @ 673] 클라이언트에 대해 협상 된 시간 제한 10000으로 세션 0x15b155ef61e0149를 설정 /10.1.98.5:2017-04-06T11 : 05 : 07.325929074Z 2017-04-06 11 : 05 : 07,325 [myid :] - ProcessOutputStream (sid : 0 cport : 2181) :: PrepRequestProcessor @ 487] - sessionid에 대한 세션 종료 처리 : 0x15b155ef61e0149 2017-04-06T11 : 05 : 07.342876962Z 2017-04-06 11 : 05 : 07,341 [정보 : NIOServerCxn.Factory : 0.0.0.0/0.0.0.0 : 2181 : NIOServerCnxn @ 1008] - 닫힌 소켓 세션 ID가 0x15b155ef61e0149 인 클라이언트 /10.1.98.5:48180에 대한 연결 2017-04-06T11 : 05 : 07.429909440Z 2017-04-06 11 : 05 : 07,429 [myid :] - 정보 [NIOServer 함수 : 0.0.0.0/0.0.0.0 : 2181 : NIOServerCnxnFactory @ 192] - /10.1.98.5:48182에서 소켓 연결 허용 2017-04-06T11 : 05 : 07.429933377Z 2017-04-06 11 : 05 : 07,429 [myid :] - INFO [NIOServerCxn.Factory : 0.0.0.0/0.0.0.0 : 2181 : ZooKeeperServer @ 928] - /10.1.98.5:48182 2017-04-06T11 : 05 : 07에 새 세션을 설정하려고하는 클라이언트.441158222Z 2017-04-06 11 : 05 : 07,439 [정보 : SyncThread : 0 : ZooKeeperServer @ 673] - 클라이언트의 협상 된 시간 제한 10000으로 세션 0x15b155ef61e014a 설정 /10.1.98.5:48182 2017-04-06T11 : 05 : 29.695276997Z 2017-04-06 11 : 05 : 29,694 [myid :] - WARN [NIOServerCxn.Factory : 0.0.0.0/0.0.0.0 : 2181 : NIOServerCnxn @ 357] - 스트림 종료 예외가 발생했습니다. 2017-04-06T11 : 05 : 29.695325790Z EndOfStreamException : 클라이언트 sessionid 0x15b155ef61e014a에서 추가 데이터를 읽을 수 없습니다. 클라이언트가 소켓 2017-04-06T11 : 05 : 29.695328912Z를 닫았습니다. (org.apache.zookeeper.server.NIOServerCnxn.doIO (NIOServerCnxn.java : 228) 2017-04-06T11 : 05 : 29.695331119Z, org.apache.zookeeper.server.NIOServerCnxnFactory.run (NIOServerCnxnFactory.java:203) 2017-04-06T11 : 05 : 29.695333009Z at java.lang.Thread. 실행 (Thread.java:745) 2017-04-06T11 : 05 : 29.696333706Z 2017-04-06 11 : 05 : 29,696 [정보 : NIOServerCxn.Factory : 0.0.0.0/0.0.0.0 : 2181 : NIOServerCnxn @ 1008] - 폐쇄 형 소켓 연결 클라이언트

예외가 발생하면 카프카에 로그인하지 마십시오. 소스 클래스

코드 스트림 소스를 포함하고있는 포드는 루프에서 충돌 계속, 그래서

package xxxx; 

import java.text.SimpleDateFormat; 
import java.util.Date; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.messaging.Source; 
import org.springframework.context.annotation.Bean; 
import org.springframework.integration.annotation.InboundChannelAdapter; 
import org.springframework.integration.core.MessageSource; 
import org.springframework.messaging.support.GenericMessage; 

@SpringBootApplication 
@EnableBinding(Source.class) 
public class HelloNitesApplication 
{ 
    public static void main(String[] args) 
    { 
     SpringApplication.run(HelloNitesApplication.class, args); 
    } 

    @Bean 
    @InboundChannelAdapter(value = Source.OUTPUT) 
    public MessageSource<String> timerMessageSource() 
    { 
     return() -> new GenericMessage<>("Hello " + new SimpleDateFormat().format(new Date())); 
    } 

입니다.

답변

0

문제는 "spring.cloud.stream.bindings.output.destination = XXX"속성이 내 구현에서 무시되고 실행하기 전에 주제가 "출력"으로 삭제 된 것으로 예상됩니다. 속성에 지정된 항목에 쓰기

내가 정의한 속성 대신 "출력"항목에 메시지를 삽입했지만 주제가 올바르게 만들어지면 모든 것이 다시 배포됩니다.