2016-11-24 4 views
3

파일 추가를 위해 디렉토리를 모니터링하는 코드 조각이 있습니다. 디렉토리에 새 파일이 추가 될 때마다 파일 내용이 선택되어 kafka에 게시 된 다음 파일이 삭제됩니다.java - 프로세스가 다른 프로세스에서 사용 중이기 때문에 프로세스가 파일에 액세스 할 수 없습니다.

이것은 하나의 요청을 만들었지 만 코드를 jMeter에서 5 명 또는 10 명의 사용자 요청을 받으면 내용이 kafka에 성공적으로 게시되지만 코드는 파일을 삭제할 수 없습니다. 나는 The process cannot access the file because it is being used by another process.이라는 메시지와 함께 FileSystemException을 얻습니다.

내가 볼 수없는 몇 가지 동시성 문제가 있다고 생각합니다. 도와주세요 !

public void monitor() throws IOException, InterruptedException { 
    Path faxFolder = Paths.get(TEMP_FILE_LOCATION); 
    WatchService watchService = FileSystems.getDefault().newWatchService(); 
    faxFolder.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); 
    boolean valid = true; 
    do { 
     WatchKey watchKey = watchService.take(); 
     for (WatchEvent<?> event : watchKey.pollEvents()) { 
      if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) { 
       String fileName = event.context().toString(); 
       publishToKafka(new File(TEMP_FILE_LOCATION + fileName).toPath(), "topic"); 
      } 
     } 
     valid = watchKey.reset(); 
    } while (valid); 
} 

private void publishToKafka(Path path, String topic) { 
    try (BufferedReader reader = Files.newBufferedReader(path)) { 
     String input = null; 
     while ((input = reader.readLine()) != null) { 
      kafkaProducer.publishMessageOnTopic(input, topic); 
     } 
    } catch (IOException e) { 
     LOG.error("Could not read buffered file to send message on kafka.", e); 
    } finally { 
     try { 
      Files.deleteIfExists(path); // This is where I get the exception 
     } catch (IOException e) { 
      LOG.error("Problem in deleting the buffered file {}.", path.getFileName(), e); 
     } 
    } 
} 

예외 로그 :

java.nio.file.FileSystemException: D:\upload\notif-1479974962595.csv: The process cannot access the file because it is being used by another process. 

    at sun.nio.fs.WindowsException.translateToIOException(Unknown Source) 
    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) 
    at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source) 
    at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source) 
    at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(Unknown Source) 
    at java.nio.file.Files.deleteIfExists(Unknown Source) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.publishToKafka(MonitorDirectory.java:193) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.sendData(MonitorDirectory.java:125) 
    at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.run(MonitorDirectory.java:113) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
+1

파일을 읽은 다음 삭제하기 전에 파일을 만드는 프로세스가 파일을 작성하지 않았습니까 (닫지 않았습니까?)? –

+0

그건 그렇고, WatchService는 닫기가 가능하므로 try-with-resources 안에'newWatchService()'를 넣으십시오. 또한, 다른 곳에서는 nio를 사용하고 있기 때문에'new File (...). toPath() '대신에 일관성을 위해'faxFolder.resolve (filename)'을 사용하십시오. –

+3

@Klitos Kyriacou : 파일 시스템을 볼 때'WatchEvent'의 컨텍스트가 이미'Path'이므로 문자열 변환도 쓸모 없으며'faxFolder.resolve ((Path) event.context())'로 충분합니다 . 제 생각에, 당신 말이 맞습니다. 창조 사건에 즉각적으로 반응하는 것은 창조자에게 파일을 닫을 충분한 시간을주지는 않습니다. 흥미롭게도, 필자의 시스템에서는 그 반대의 효과가있다 : 파일을 읽는 것이 여전히 쓰여지는 것처럼 실패하지만, 작동이 삭제되면 시스템이 실제 제거를 연기하므로 작성자가 파일을 닫는다. – Holger

답변

0

당신은 그것을 삭제하기 전에 해당 파일에 액세스하는 모든 연결을 종료해야합니다.

+1

'try-with-resources'가'finally' 코드를 실행하기 전에 그것을하고 있다고 생각합니다. –

0

코드를 보면 한 파일을 다시 게시하기 위해 스레드가 선택했을 때 다른 스레드가 게시를 위해이를 선택하는 것 같습니다. 그래서 아무도 그것을 삭제할 수 없습니다. 동시성 문제 만 가능해야합니다. 동시에 실행할 수있는 단계와 될 수없는 단계를 기준으로 코드를 재 설계해야합니다. 전체 과정에서 그래서 단계는 다음과 같습니다

  1. 파일을 선택
  2. 파일 (그것을 할 수있는 다른 스레드 전화)
  3. 파일을 삭제 (호출 스레드를해야한다 게시 (주 스레드를 수행해야합니다) 모든 파일의 존재는 (다시 메인 스레드는 또한

) 파일이 선택되는 순간 그것을 할 수 있다면)

  • 체크를 삭제, 당신은 버퍼로 읽어을 삭제 한 다음 게시를 계속할 수 있습니다. 이렇게하면 주 스레드가이 파일을 다른 스레드에 할당하지 않도록합니다.

  • +1

    OP 게시물에 쓰레드가 사용되지 않습니다. –

    0

    동적으로 생성 된 파일을 대기열 서비스에 업로드하려고 할 때 다음 스레드와 비슷한 문제가 발생했습니다. Multithreading on Queue 해결하기 위해 2 일이 걸렸습니다. 위의 대답을 준 Holger 덕분에 잠금이 다른 스레드가 읽을 때 완전히 완료되지 않았기 때문에 발생할 수 있으므로 시간이 많이 절약되었습니다.

    WatchEvent<Path> ev = cast(event); 
    Path name = ev.context(); 
    Path child = dir.resolve(name); 
    //queueUploadFile(child); 
    if (kind == ENTRY_CREATE) { 
        uploadToQueue(this.queueId, child); 
    } 
    

    나는 그것을 변경 :

    인터넷에서 찾은 많은 나의 초기 솔루션이었다

    WatchEvent<Path> ev = cast(event); 
    Path name = ev.context(); 
    Path child = dir.resolve(name); 
    //queueUploadFile(child); 
    if (kind == ENTRY_MODIFY) { 
        uploadToQueue(this.queueId, child); 
    } 
    

    그리고 모든 것을 완벽하게 작동합니다. "여하튼"여러 개의 ENTRY_MODIFY 이벤트 발생 (중복 된 파일 업로드)을 처리하기 위해 업로드 된 uploadToQueue() 메서드 내에서 파일에 대한 삭제 작업을 수행합니다.

    나는 위의 공헌도를 바탕으로 한 나의 접근법이 비슷한 문제를 가진 다른 사람들에게도 도움이되기를 바랍니다.