3

저는 영어가 모국어가 아닌 사람이지만 가능한 한 명확하게 질문하려고합니다. 이 문제가 발생하여 2 일 동안 나를 혼란스럽게 만들었으며 해결책을 찾지 못했습니다.스프링 클라우드 데이터 흐름에서 싱크 구성 요소가 카프카로 올바른 데이터를 가져 오지 못함

Hadoop YARN의 Spring Could Data Flow에서 실행될 스트림을 만들었습니다.

스트림은 Http 소스, 프로세서 및 파일 싱크로 구성됩니다.

1.Http 소스
는 HTTP 소스 컴포넌트는 application.properties 정의하고 dest2 dest1에있는 두 개의 다른 목적지 바인딩 2 개 개의 출력 채널을 갖는다. 다음은

spring.cloud.stream.bindings.output.destination = dest1에 spring.cloud.stream.bindings.output2.destination = dest2

는 참조 용 HTTP 소스 코드 snipet입니다 ..

@Autowired 
    private EssSource channels; //EssSource is the interface for multiple output channels 

##output channel 1: 
    @RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"}) 
    @ResponseStatus(HttpStatus.ACCEPTED) 
    public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { 
     logger.info("enter ... handleRequest1..."); 
     channels.output().send(MessageBuilder.createMessage(body, 
       new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); 
    } 

##output channel 2: 
    @RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"}) 
    @ResponseStatus(HttpStatus.ACCEPTED) 
    public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { 
     logger.info("enter ... handleRequest2..."); 
     channels.output2().send(MessageBuilder.createMessage(body, 
       new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); 
    } 

2 프로세서
프로세서는 다른 목적지로 결합 여러 개의 입력 채널과 2 개 개의 출력 채널을 갖는다. 대상 바인딩은 프로세서 구성 요소 프로젝트의 application.properties에 정의되어 있습니다.

//input channel binding 
spring.cloud.stream.bindings.input.destination=dest1 
spring.cloud.stream.bindings.input2.destination=dest2 

//output channel binding 
spring.cloud.stream.bindings.output.destination=hdfsSink 
spring.cloud.stream.bindings.output2.destination=fileSink 

다음은 프로세서의 코드 스 니펫입니다.

@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT) 
    public Object transform(Message<?> message) { 
     logger.info("enter ...transform..."); 

     return "processed by transform1";; 
    } 


    @Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2) 
    public Object transform2(Message<?> message) { 
     logger.info("enter ... transform2..."); 
     return "processed by transform2"; 
    } 

3. 파일 싱크 구성 요소.

저는 Spring의 공식적인 fil sink 구성 요소를 사용합니다. 받는다는 : //org.springframework.cloud.stream.app : 파일 싱크 카프카 : 1.0.0.BUILD-SNAPSHOT

그리고 난 그냥 그 applicaiton.properties 파일에 바인딩 대상을 추가 할 수 있습니다. spring.cloud.stream.bindings.input.destination = fileSink

4.Finding :

I이 좋아한다 예상 데이터 흐름 :

Source.handleRequest을() -> 프로세서 .handleRequest()

Source.handleRequest2() -> Processor.handleRequest2() -> Sink.fileWritingMessageHandler();

"transform2에서 처리 한"문자열 만 파일에 저장해야합니다.

하지만 내 테스트 후

는, 데이터 흐름은 다음과 같이 실제입니다 :

Source.handleRequest() -> Processor.handleRequest() -> Sink.fileWritingMessageHandler(); 012.htm.fileWritingMessageHandler();

"처리 된 transform1"과 "처리 된 transform2"문자열은 모두 파일에 저장됩니다.

5.Question

: Processor.handleRequest의 출력 채널()에 대한 대상 대신 fileSink의 hdfsSink에 결합되지만

는 데이터 싱크 여전히 제기 흐른다. 나는 이것을 이해할 수 없으며 이것은 내가 원하는 것이 아니다. 난 단지 Processor.handleRequest2()에서 데이터를 모두 대신 파일 싱크를 싶어요. 내가 제대로하지 않는다면 누구나 내게 그 방법을 알려주고 해결책은 무엇일까요? 2 일 동안 나를 혼란스럽게 만들었습니다.

친절하게 도와 주셔서 감사합니다.

알렉스

답변

2

합니다 ('-2'버전은 여러 채널을 가진 것들)이 같은 스트림 정의 뭔가 있습니까? 봄 클라우드 데이터 흐름이 프로세서의 spring.cloud.stream.bindings.output.destinationhdfs-sink으로 설정되어있는 경우에도, 이유입니다 applications.properties에 정의 된 목적지를 오버라이드 (override) 할 것을

http-source-2 | processor-2 | file-sink 

주, 실제로 file-sink의 입력과 일치합니다.

목적지 (탭의 맥락에서) 여기에서 설명되는 스트림 정의에서 구성하는 방법 : 당신이 할 수있는 것은 http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-tap-dsl

단순히 채널 1과 2의 의미를 교환하는 -에 대한 측면 채널을 사용 hdfs. 이것은 약간 깨지기 쉽습니다. 스트림의 input/output 채널이 자동으로 구성되고 다른 채널은 application.properties을 통해 구성됩니다.이 경우 스트림 정의 또는 배포시에 사이드 채널 대상을 구성하는 것이 더 나을 수 있습니다 - http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties 참조.

이것들은 데이터가 나란히 흘러가는 것으로 가정 할 때, 일반 구성 요소를 사용하여 별도의 엔드 포인트를 청취하는 2 개의 스트림이 될 수 있습니다.

+0

안녕하세요 마리우스, 답변 해 주셔서 감사합니다. 이제 작동합니다. 정말 고마워요. 정말 도움이됩니다. –

+0

그런데 오늘 오후에 YARN에서 테스트를 수행했을 때 YARN의 SCDF가/dataflow/artifacts/cache 폴더의 Apps를 캐시 할뿐만 아니라 데이터베이스와 같은 다른 곳에서도 캐시 된 것으로 나타났습니다. DB에 앱을 캐시하고 있는지 잘 모르겠습니다. 내가 스트림을 파괴하고 Apps를 등록 취소하고/dataflow/artifacts/cache 폴더의 모든 파일을 삭제 했으므로 캐시 된 Apps를 사용하여 Yarn의 SCDF가 여전히 배포 된 것으로 나타났습니다. 마침내, 나는 선택의 여지가 있지만 SCDF 서버를 다시 시작하면 작동합니다. 서버를 다시 시작하지 않고 캐시 데이터를 완전히 지우는 방법을 알고 계십니까? 감사합니다. –

+0

이 [문서의 섹션] (http://docs.spring.io/spring-cloud-dataflow-server-yarn/docs/1.0.1.RELEASE/reference/htmlsingle/#yarn-how- it-works) 유용 할 수 있습니다. 캐싱은'/ dataflow/artifacts/cache'와'hdfs' 디렉토리에서 이루어 지므로 캐쉬를 지울 때마다 서버를 재시작해야 할 수도 있습니다. 이 워크 플로우에 대한 피드백 및/또는 개선 사항을 언제든지 공유하십시오. –