0

처리 된 데이터를 BlockingCollection에 추가하여 Parallel.ForEach을 통해 많은 양의 텍스트 파일을 처리하려고합니다.while (true) 루프를 EventWaitHandle로 변환 할 수 있습니까?

문제는 Task taskWriteMergedFile이 콜렉션을 사용하고 적어도 800000 라인마다 결과 파일에 기록하기를 원합니다.

Task을 생성 할 수 있도록 병렬 처리되어 있으므로 반복 내에서 컬렉션 크기를 테스트 할 수 없다고 생각합니다.

이 경우 작업의 while (true) 루프를 EventWaitHandle으로 변환 할 수 있습니까?

const int MAX_SIZE = 1000000; 
static BlockingCollection<string> mergeData; 
mergeData = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_SIZE); 


string[] FilePaths = Directory.GetFiles("somepath"); 

var taskWriteMergedFile = new Task(() => 
{ 
    while (true) 
    { 
     if (mergeData.Count > 800000) 
     { 
      String.Join(System.Environment.NewLine, mergeData.GetConsumingEnumerable()); 
      //Write to file 
     } 
     Thread.Sleep(10000); 
    } 
}, TaskCreationOptions.LongRunning); 

taskWriteMergedFile.Start(); 
Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath)); 
mergeData.CompleteAdding(); 

답변

1

아마도 그렇게하지 않으실 것입니다. 대신, 각 행을받은 파일에 기록하십시오. 파일 크기를 80,000 줄로 제한하려면 80,000 줄을 작성한 후 현재 파일을 닫고 새 파일을여십시오.

GetConsumingEnumerable()은 컬렉션이 추가 완료로 표시 될 때까지 멈추지 않으므로 생각하면됩니다. 대기열에있는 항목이 80,000 개가 될 때까지 잠자기 루프를 거쳐 주 스레드가 CompleteAdding을 호출 할 때까지 String.Join에서 차단됩니다. 충분한 데이터가 있으면 메모리가 부족합니다.

또한 아주 좋은 이유가 없다면 여기에서 ConcurrentBag을 사용하지 않아야합니다. BlockingCollection의 기본값 인 ConcurrentQueue을 사용하면됩니다. ConcurrentBagConcurrentQueue과 같이 잘 수행되지 않는 특수 목적의 데이터 구조입니다.

그래서 귀하의 작업이된다 : 당신이 다른 곳에서 출력 파일을 연 것을 물론, 가정

var taskWriteMergedFile = new Task(() => 
{ 
    int recordCount = 0; 
    foreach (var line in mergeData.GetConsumingEnumerable()) 
    { 
     outputFile.WriteLine(line); 
     ++recordCount; 
     if (recordCount == 80,000) 
     { 
      // If you want to do something after 80,000 lines, do it here 
      // and then reset the record count 
      recordCount = 0; 
     } 
    } 
}, TaskCreationOptions.LongRunning); 

. 작업 시작시 출력을 열고 foreach이 종료 한 후에 닫는 것이 좋습니다.

다른 점은 제작자 루프가 평행하지 않기를 바랍니다.

Parallel.ForEach(FilePaths, FilePath => AddToDataPool(FilePath)); 

내가 AddToDataPool가 무엇을하고 있는지 확실히 모르겠지만,이 파일을 읽고 컬렉션에 데이터를 쓰고, 당신은 몇 가지 문제가있다 : 당신은있다. 첫째, 디스크 드라이브는 한 번에 한 가지만 할 수 있으므로 한 파일의 일부분을 읽은 다음 다른 파일의 일부를 읽은 다음 다른 파일의 일부를 읽습니다. 다음 파일의 각 청크를 읽으려면 다음을 수행해야합니다. 머리를 올바른 위치로 찾으십시오. 디스크 헤드 검색은 5 밀리 초 이상으로 매우 비쌉니다. CPU 시간의 영원성. 파일을 읽는 것보다 시간이 많이 걸리는 작업을 수행하지 않는 한, 한 번에 한 파일을 처리하는 것이 좋습니다. 입력 파일이 별도의 물리적 디스크에 있음을 보장 할 수 없다면. . .

두 번째 잠재적 인 문제는 여러 스레드가 실행 중일 때 컬렉션에 기록되는 순서를 보장 할 수 없다는 것입니다. 물론 문제는 아니지만 단일 파일의 모든 데이터가 출력으로 그룹화 될 것으로 예상된다면 컬렉션에 여러 줄을 쓰는 여러 스레드에서 발생하지 않을 것입니다.

명심할 사항.