0

저는 'spring-cloud-dataflow'라는 새로운 프로젝트를 시작하여 필요에 맞게 병을 개발했습니다.프로세서가 동일한 페이로드로 여러 번 메시지를 여러 번 수신합니다.

이 파일 중 하나는 파일 소스의 파일을 untar 처리하는 프로세서이며,이 응용 프로그램은 tar 및 gunzip 파일 압축을 처리하는 기능이있는 integration-zip의 사용자 정의 버전을 사용합니다.

내 문제는 다음과 같습니다. 내 소스가 파일 참조와 함께 단일 메시지를 보내는 동안 프로세서는 동일한 메시지를 여러 번 수신하지만 동일한 페이로드이지만 다른 ID를 사용합니다.

Here the log file of both component

당신이 파일을 볼 수 있듯이

는 메시지에 생산 :

2017-10-02 12:38:28.013 INFO 17615 --- [ask-scheduler-3] o.s.i.file.FileReadingMessageSource  : Created message: [GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={id=0b99b840-e3b3-f742-44ec-707aeea638c8, timestamp=1506940708013}]] 

생산자가 들어오는 3 메시지를 가지고있는 동안 : 나는이 문제에 대한 해결책을 찾을 수 없습니다

2017-10-02 12:38:28.077 INFO 17591 --- [   -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer 
2017-10-02 12:38:28.080 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=1 
a4d4b9c-86fe-d3a8-d800-8013e8ae7027, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940708079}]' unpacking started... 
2017-10-02 12:38:28.080 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress 
2017-10-02 12:38:29.106 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=c 
d611ca4-4cd9-0624-0871-dcf93a9a0051, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940709106}]' unpacking started... 
2017-10-02 12:38:29.107 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress 
2017-10-02 12:38:31.108 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Message 'GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}]' unpacking started... 
2017-10-02 12:38:31.108 INFO 17591 --- [   -L-1] .w.c.s.a.c.p.AbstractCompressTransformer : Check message's payload type to decompress 
2017-10-02 12:38:31.116 ERROR 17591 --- [   -L-1] o.s.integration.handler.LoggingHandler : org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is org.springframework.messaging.MessageHandlingException: Failed to apply Zip transformation.; nested exception is java.io.FileNotFoundException: /tmp/patent/CNINO_im_201733_batch108.tgz (File o directory non esistente), failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}], failedMessage=GenericMessage [payload=/tmp/patent/CNINO_im_201733_batch108.tgz, headers={kafka_offset=1, id=97171a2e-29ac-2111-b838-3da7220f5e3c, kafka_receivedPartitionId=0, contentType=application/x-java-object;type=java.io.File, kafka_receivedTopic=untar.file, timestamp=1506940711108}] 
     at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44) 

을 누구나 똑같은 문제가 있으며 해결 방법을 찾았습니까? 또는 내가 놓친 구성이 있습니까?

편집 :

내가 같은 파일 시스템에 SDFS 버전 1.2.2.RELEASE의 로컬 버전, 그래서 IO 파일 조작 작업을 사용하고

, 나는 SCS의 버전 Ditmars.BUILD - 스냅 샷을 사용합니다.

파일 삭제 작업 응용 프로그램을 비활성화하면 유감스럽게도이 응용 프로그램은 메시지를 여러 번 처리합니다.

@Override 
    protected Object doCompressTransform(final Message<?> message) throws Exception { 
    logger.info(String.format("Message '%s' unpacking started...", message)); 

    try (InputStream checkMessage = checkMessage(message); 
     InputStream inputStream = (gzCompression ? new BufferedInputStream(new GZIPInputStream(checkMessage)) : new BufferedInputStream(checkMessage))) { 

     final Object payload = message.getPayload(); 
     final Object unzippedData; 

     try (TarArchiveInputStream tarIn = new TarArchiveInputStream(inputStream)){   
     TarArchiveEntry entry = null; 

     final SortedMap<String, Object> uncompressedData = new TreeMap<String, Object>(); 

     while ((entry = (TarArchiveEntry) tarIn.getNextEntry()) != null) { 

      final String zipEntryName = entry.getName(); 
      final Date zipEntryTime = entry.getLastModifiedDate(); 
      final long zipEntryCompressedSize = entry.getSize(); 

      final String type = entry.isDirectory() ? "directory" : "file"; 

      final File tempDir = new File(workDirectory, message.getHeaders().getId().toString()); 
      tempDir.mkdirs(); // NOSONAR false positive 

      final File destinationFile = new File(tempDir, zipEntryName); 

      if (entry.isDirectory()) { 
      destinationFile.mkdirs(); // NOSONAR false positive 
      } 
      else { 
      unpackEntries(tarIn, entry, tempDir); 
      uncompressedData.put(zipEntryName, destinationFile); 
      } 
     } 

     if (uncompressedData.isEmpty()) { 
      unzippedData = null; 
     } 
     else { 
      if (this.expectSingleResult) { 
      if (uncompressedData.size() == 1) { 
       unzippedData = uncompressedData.values().iterator().next(); 
      } 
      else { 
       throw new MessagingException(message, String.format("The UnZip operation extracted %s " 
         + "result objects but expectSingleResult was 'true'.", uncompressedData.size())); 
      } 
      } 
      else { 
      unzippedData = uncompressedData; 
      } 

     } 

     logger.info("Payload unpacking completed..."); 
     } 
     finally { 
     if (payload instanceof File && this.deleteFiles) { 
      final File filePayload = (File) payload; 
      if (!filePayload.delete() && logger.isWarnEnabled()) { 
      if (logger.isWarnEnabled()) { 
       logger.warn("failed to delete File '" + filePayload + "'"); 
      } 
      } 
     } 
     } 
     return unzippedData; 
    } 
    catch (Exception e) { 
     throw new MessageHandlingException(message, "Failed to apply Zip transformation.", e); 
    } 
} 

예외 :

@EnableBinding(Processor.class) 
@EnableConfigurationProperties(UnTarProperties.class) 
public class UnTarProcessor { 

    @Autowired 
    private UnTarProperties properties; 

    @Autowired 
    private Processor processor; 

    @Bean 
    public UncompressedResultSplitter splitter() { 
    return new UncompressedResultSplitter(); 
    } 

    @Bean 
    public UnTarGzTransformer transformer() { 
    UnTarGzTransformer unTarGzTransformer = new UnTarGzTransformer(properties.isUseGzCompression()); 
    unTarGzTransformer.setExpectSingleResult(properties.isSingleResult()); 
    unTarGzTransformer.setWorkDirectory(new File(properties.getWorkDirectory())); 
    unTarGzTransformer.setDeleteFiles(properties.isDeleteFile()); 

    return unTarGzTransformer; 
    } 

    @Bean 
    public IntegrationFlow process() { 

    return IntegrationFlows.from(processor.input()) 
     .transform(transformer()) 
     .split(splitter()) 
     .channel(processor.output()) 
     .get(); 
    } 
} 

이 파일의 압축을 해제하는 데 사용되는 핵심 방법 : 이것은 내 프로세서 클래스는

입니다 : 여기에 몇 가지 코드 조각, 그리고 난이 내 프로젝트 repo처럼 메서드 checkmessage()에 의해 throw됩니다.

protected InputStream checkMessage(Message<?> message) throws FileNotFoundException { 
     logger.info("Check message's payload type to decompress"); 

     InputStream inputStream; 
     Object payload = message.getPayload(); 

     if (payload instanceof File) { 
     final File filePayload = (File) payload; 

      if (filePayload.isDirectory()) { 
       throw new UnsupportedOperationException(String.format("Cannot unzip a directory: '%s'", 
         filePayload.getAbsolutePath())); 
      } 

      inputStream = new FileInputStream(filePayload); 
     } 
     else if (payload instanceof InputStream) { 
      inputStream = (InputStream) payload; 
     } 
     else if (payload instanceof byte[]) { 
      inputStream = new ByteArrayInputStream((byte[]) payload); 
     } 
     else { 
      throw new IllegalArgumentException(String.format("Unsupported payload type '%s'. " + 
        "The only supported payload types are java.io.File, byte[] and java.io.InputStream", 
        payload.getClass().getSimpleName())); 
     } 

     return inputStream; 
} 

정말 고맙습니다. 어떤 도움. 고마워요

답변

1

더 많은 정보가 필요합니다. SCDF 및 SCS 응용 프로그램의 버전. 최소한 앱 배포 방법에 대한 DSL.

로그를 확인한 결과 소비자가 FileNotFoundException으로 인해 메시지를 소비하지 않는다는 것을 알고 계셨습니까? 동일한 메시지를 여러 번 수신하지 못하는 경우 SCS는 오류가 발생하기 전에 재전송을 시도하고 있습니다. 전체 로그를 확인하고 지정된 위치에서 파일을 열지 못하는 방법을 확인하십시오.

+0

'파일 참조가있는 단일 메시지'로는 아무것도 표시되지 않습니다. 글쎄, 당신은 귀하의 프로세서 애플 리케이션은 동일한 파일 시스템에 파일에 액세스 할 수 있는지 확인해야합니다.그렇지 않으면 다른 컴퓨터에'/ tmp/patent/CNINO_im_201733_batch108.tgz' 파일이 없다는 것이 명백합니다. –

+0

SCDF의 로컬 버전을 사용하고 있습니다. 파일은 두 응용 프로그램에서 읽을 수있는 임시 디렉토리에 저장됩니다. 첫 번째 메시지 전달 후 파일 i가 처리 될 때 FNF 예외가 발생합니다. 내 구성 및 코드에 대한 게시물 오순절 더 많은 정보를 편집했습니다. –

0

변압기에서 예외가 발생합니다. 오류가 논리에 있으므로 SCS의 구성을 다시 시도하여 여러 번 메시지를 수신합니다. 따르기가 어렵다. 그것은 말합니다 FileNotFoundException 나는 당신의 과정에서 그 파일을 넣는 이유가 무엇인지 모른다. SCS