2014-12-25 17 views
0

최근에 Java 7 FORK/JOIN 프레임 워크와 FileChannel을 사용하여 파일을 복사하는 연습을하고 있습니다. 나는 내 코드, 스레드의 수가 1 인 경우 는, 파일이 정확하게 복사되지만를 실행포크/조인을 FileChannel과 결합하여 파일 복사

import java.io.File; 
import java.io.FileInputStream; 
import java.io.FileNotFoundException; 
import java.io.FileOutputStream; 
import java.io.IOException; 
import java.nio.channels.FileChannel; 
import java.nio.file.Files; 
import java.nio.file.Paths; 
import java.util.ArrayList; 
import java.util.concurrent.ForkJoinPool; 
import java.util.concurrent.RecursiveTask; 
import java.util.concurrent.TimeUnit; 
import java.util.logging.Level; 
import java.util.logging.Logger; 

public class Test { 
    private ArrayList<FileProcessor> processors = new ArrayList<FileProcessor>(); 

    public Test(){ 
     String outputDir = "C:\\temp"; 
     if (!Files.isDirectory(Paths.get(outputDir))) { 
      System.out.println("this is not a path"); 
     } else { 
      try { 
       //start copying file 
       ForkJoinPool pool = new ForkJoinPool(); 
       int numberOfThread = 2; 
       File file = new File("C:\\abc.cdm"); 

       long length = file.length(); 
       long lengthPerCopy = (long)(length/numberOfThread); 
       long position = 0L; 

       for (int i = 0; i < numberOfThread; i++) { 
        FileProcessor processor = null; 
        if (i == numberOfThread - 1) { 
         //the last thread 
         processor = new FileProcessor("abc.cdm", "C:\\abc.cdm", "C:\\temp", position, length - position); 
        } else { 
         processor = new FileProcessor("abc.cdm", "C:\\abc.cdm", "C:\\temp", position, lengthPerCopy); 
         position = position + lengthPerCopy + 1; 
        } 
        processors.add(processor); 
        pool.execute(processor); 
       } 

       do { 
        System.out.printf("******************************************\n"); 
        System.out.printf("Main: Parallelism: %d\n", pool.getParallelism()); 
        System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount()); 
        System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount()); 
        System.out.printf("Main: Steal Count: %d\n", pool.getStealCount()); 
        System.out.printf("******************************************\n"); 
        try 
        { 
         TimeUnit.SECONDS.sleep(1); 
        } catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 
       } while (!isDone()); //when all the thread not been done 

      pool.shutdown(); 
      System.out.println("copy done"); 
      } catch (Exception ex) { 
       //out an error here... 
      } 
     } 

    } 

    private boolean isDone(){ 
     boolean res = false; 
     for (int i = 0; i < processors.size(); i++) { 
      res = res || processors.get(i).isDone(); 
     } 
     return res; 
    } 

    public static void main(String args[]) { 
     Test test = new Test(); 
    } 

    class FileProcessor extends RecursiveTask<Integer> 
    { 
     private static final long serialVersionUID = 1L; 
     private long copyPosition; 
     private long copyCount; 
     FileChannel source = null; 
     FileChannel destination = null; 
     //Implement the constructor of the class to initialize its attributes 
     public FileProcessor(String fileName, String filePath, String outputPath, long position, long count) throws FileNotFoundException, IOException{ 
      this.copyPosition = position; 
      this.copyCount = count; 
      this.source = new FileInputStream(new File(filePath)).getChannel().position(copyPosition); 
      this.destination = new FileOutputStream(new File(outputPath + "/" + fileName), true).getChannel().position(copyPosition); 
     } 

     @Override 
     protected Integer compute() 
     { 
      try { 
       this.copyFile(); 
      } catch (IOException ex) { 
       Logger.getLogger(FileProcessor.class.getName()).log(Level.SEVERE, null, ex); 
      } 
      return new Integer(0); 
     } 

     private void copyFile() throws IOException { 

      try { 
       destination.transferFrom(source, copyPosition, copyCount); 
      } 
      finally { 
       if (source != null) { 
        source.close(); 
       } 
       if (destination != null) { 
        destination.close(); 
       } 
      } 
     } 
    } 

} 

theads의 수는 2, 파일 "C 일 때 : : 여기에 내 코드 (Test.java)입니다 \ ABC .cdm "은 77KB (78335)이지만 복사 된 후"C : \ temp \ abc.cdm "파일은 단지 (39KB)입니다.

어디서 잘못 됐습니까?

업데이트 : 문제의 isDone 방법에 내 문제가있다 해결할 수있는 문제 , 그것은해야합니다 :

File file = new File(selectedFile[i].getPath()); 
long length = file.length(); 
new RandomAccessFile("C:\\temp\abc.cdm", "rw").setLength(length); 

이를 :

boolean res = true; 
for (int i = 0; i < processors.size(); i++) { 
    res = res && processors.get(i).isDone(); 
} 
return res; 

은 또한 코드의 다음 줄을 편집 FORK/JOIN 사용법에 불과합니다.

+0

당신이 게시 코드는 컴파일되지 않습니다 - 당신은 브라켓 문제가를 ... – alfasin

+0

안녕하세요 alfasin, I 코드를 수정했습니다! – user2758327

+0

이 문제는 여전히 분석의 근본 원인이 아니지만 isDone() 메서드는 의심 스럽습니다. 'FileProcessor' 중 적어도 하나가 완료되면'true'를 리턴합니다. 그러나 모든 FileProcessor가 완료되면 true를 반환한다는 것이 그 의도라고 생각합니다. '||'이 아닌 '&&'또는 단순히 다음과 같아야합니다 :'private boolean isDone() {for (final FileProcessor p : processors) if (! p.isDone()) false를 반환합니다. 참을 돌려라. }'. –

답변

2

isDone() 메서드는 실제로 잘못되어 원래 질문에서 수정했습니다. 그러나 FileProcessor에는 또 다른 문제가 있습니다. 파일의 끝을 지나서 목적지의 위치를 ​​설정하면 파일을 전송할 때 자동으로 파일이 커지게된다고 가정합니다. 그렇지 않다.

첫 번째 세그먼트는 쓰기 위치가 0이고 파일 길이가 0보다 작을 수 없기 때문에 항상 씁니다. 총 파일 크기의 절반에 해당하는 39K입니다. 두 번째 부분은 결코 쓰여지지 않았습니다.

, 당신은 시작에서 다음을 수행 할 수 있습니다 실행하는 코드를 얻기 위해

:

File file = new File("C:\\abc.cdm"); 
long length = file.length(); 

new RandomAccessFile("C:\\temp\\abc.cdm", "rw").setLength(length);` 
+0

감사합니다. 효과가 있습니다! :디 – user2758327