저는 영어가 모국어가 아닌 사람이지만 가능한 한 명확하게 질문하려고합니다. 이 문제가 발생하여 2 일 동안 나를 혼란스럽게 만들었으며 해결책을 찾지 못했습니다.스프링 클라우드 데이터 흐름에서 싱크 구성 요소가 카프카로 올바른 데이터를 가져 오지 못함
Hadoop YARN의 Spring Could Data Flow에서 실행될 스트림을 만들었습니다.
스트림은 Http 소스, 프로세서 및 파일 싱크로 구성됩니다.
1.Http 소스
는 HTTP 소스 컴포넌트는 application.properties 정의하고 dest2 dest1에있는 두 개의 다른 목적지 바인딩 2 개 개의 출력 채널을 갖는다. 다음은
spring.cloud.stream.bindings.output.destination = dest1에 spring.cloud.stream.bindings.output2.destination = dest2
는 참조 용 HTTP 소스 코드 snipet입니다 ..@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
2 프로세서
프로세서는 다른 목적지로 결합 여러 개의 입력 채널과 2 개 개의 출력 채널을 갖는다. 대상 바인딩은 프로세서 구성 요소 프로젝트의 application.properties에 정의되어 있습니다.
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
다음은 프로세서의 코드 스 니펫입니다.
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message) {
logger.info("enter ...transform...");
return "processed by transform1";;
}
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message) {
logger.info("enter ... transform2...");
return "processed by transform2";
}
3. 파일 싱크 구성 요소.
저는 Spring의 공식적인 fil sink 구성 요소를 사용합니다. 받는다는 : //org.springframework.cloud.stream.app : 파일 싱크 카프카 : 1.0.0.BUILD-SNAPSHOT
그리고 난 그냥 그 applicaiton.properties 파일에 바인딩 대상을 추가 할 수 있습니다. spring.cloud.stream.bindings.input.destination = fileSink
4.Finding :
I이 좋아한다 예상 데이터 흐름 :
Source.handleRequest을() -> 프로세서 .handleRequest()
Source.handleRequest2() -> Processor.handleRequest2() -> Sink.fileWritingMessageHandler();
"transform2에서 처리 한"문자열 만 파일에 저장해야합니다.
하지만 내 테스트 후
는, 데이터 흐름은 다음과 같이 실제입니다 :Source.handleRequest() -> Processor.handleRequest() -> Sink.fileWritingMessageHandler(); 012.htm.fileWritingMessageHandler();
"처리 된 transform1"과 "처리 된 transform2"문자열은 모두 파일에 저장됩니다.
5.Question
: Processor.handleRequest의 출력 채널()에 대한 대상 대신 fileSink의 hdfsSink에 결합되지만는 데이터 싱크 여전히 제기 흐른다. 나는 이것을 이해할 수 없으며 이것은 내가 원하는 것이 아니다. 난 단지 Processor.handleRequest2()에서 데이터를 모두 대신 파일 싱크를 싶어요. 내가 제대로하지 않는다면 누구나 내게 그 방법을 알려주고 해결책은 무엇일까요? 2 일 동안 나를 혼란스럽게 만들었습니다.
친절하게 도와 주셔서 감사합니다.
알렉스
안녕하세요 마리우스, 답변 해 주셔서 감사합니다. 이제 작동합니다. 정말 고마워요. 정말 도움이됩니다. –
그런데 오늘 오후에 YARN에서 테스트를 수행했을 때 YARN의 SCDF가/dataflow/artifacts/cache 폴더의 Apps를 캐시 할뿐만 아니라 데이터베이스와 같은 다른 곳에서도 캐시 된 것으로 나타났습니다. DB에 앱을 캐시하고 있는지 잘 모르겠습니다. 내가 스트림을 파괴하고 Apps를 등록 취소하고/dataflow/artifacts/cache 폴더의 모든 파일을 삭제 했으므로 캐시 된 Apps를 사용하여 Yarn의 SCDF가 여전히 배포 된 것으로 나타났습니다. 마침내, 나는 선택의 여지가 있지만 SCDF 서버를 다시 시작하면 작동합니다. 서버를 다시 시작하지 않고 캐시 데이터를 완전히 지우는 방법을 알고 계십니까? 감사합니다. –
이 [문서의 섹션] (http://docs.spring.io/spring-cloud-dataflow-server-yarn/docs/1.0.1.RELEASE/reference/htmlsingle/#yarn-how- it-works) 유용 할 수 있습니다. 캐싱은'/ dataflow/artifacts/cache'와'hdfs' 디렉토리에서 이루어 지므로 캐쉬를 지울 때마다 서버를 재시작해야 할 수도 있습니다. 이 워크 플로우에 대한 피드백 및/또는 개선 사항을 언제든지 공유하십시오. –