3

"작업 단위"가 특정 순서없이 수행되어야하는 Iterable입니다. 서로 간섭하지 않고 병렬로 쉽게 실행할 수 있습니다.스칼라 병렬 비 순차 반복기

불행히도 한 번에 너무 많은 양을 실행하면 사용 가능한 RAM이 초과되므로 주어진 시간에 한 번에 한꺼번에 실행해야합니다. 내가 어디의 지식을 유지하려는 경우 출력 Iterator이 (입력과 같은 순서로 필요는 없다

parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B] 

있도록 : 가장 기본적인에서

, 나는이 유형 서명 기능을 원하는 결과는 소비자가 입력 또는 다른 것으로 쌍을 출력 할 수 있습니다.) 소비자는이 작업에서 가능한 한 많은 병렬 처리를 유지하면서 컴퓨터의 메모리를 모두 먹지 않고 결과 반복기를 점차적으로 소비 할 수 있습니다.

또한 가능한 한 효율적으로 기능을 원합니다. 예를 들어 다음과 같은 라인을 따라 뭔가를 위해 내가 가진 초기 생각했다 : 나는 즉시 준비했다으로 그 반복자의 요소를 생산 시작할 수 있다는 스칼라의 병렬 수집을 알리는 것 toSet을 기대했다

xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator) 

, 어떤 순서로든 호출하고, grouped 콜은 동시 작업자의 수를 제한하는 것이었다. 불행하게도 toSet 호출이 원하는 효과를 얻지 못합니다 (결과는 내 실험에서 par 호출없이 동일한 순서로 반환됩니다). grouped 호출이 차선입니다. 예를 들어, 그룹 크기가 100이고 이러한 작업 중 99 개가 12 개의 코어에서 즉시 완료되지만 그 중 하나가 특히 느린 경우 나머지 코어의 대부분은 다음 그룹으로 이동할 때까지 유휴 상태가됩니다. 내 청크 크기만큼 크지 만 느린 작업자가지지하지 않는 "적응 형 창"을 갖는 것이 훨씬 더 명확합니다.

필자는 작업 훔치기 (대기열) 또는 그 행을 따라 뭔가를 쓰는 것을 상상할 수 있지만 동시성 기본 요소를 다루는 많은 노력이 이미 내게 행해졌다고 생각합니다. 스칼라의 병렬 콜렉션 라이브러리에서 레벨. 누구든지이 기능을 구현하기 위해 재사용 할 수있는 부분을 알고 있습니까? 아니면 이러한 작업을 구현하는 방법에 대한 다른 제안이 있습니까?

답변

3

병렬 수집 프레임 워크를 사용하면 주어진 작업에 사용할 최대 스레드 수를 지정할 수 있습니다. 스칼라 - 2.10 사용하여, 당신은하고 싶은 것 :

def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = { 
    val px = x.par 
    px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize)) 
    px map f 
} 

이 한 번에 실행 이상 chunkSize 작업을 방지 할 수 있습니다. 이렇게하면 배우를 계속 작동시킬 수있는 작업 도용 전략이 사용되므로 위에 나온 grouped 예제와 동일한 문제가 발생하지 않습니다.

이렇게하면 결과를 처음 완료된 순서로 다시 정렬하지 않습니다. 이를 위해서는 액터로 작업을 전환하고 작업을 실행하는 작은 액터 풀을 가지고 완료 한 결과를 다시 보내는 것과 같은 것을 제안합니다.

+0

리오 더링 지원이 없으면 특정 요소가 계산하는 데 시간이 걸리면 작동하지 않을까요? –

+0

'할 일'이 무엇을 의미 하느냐에 따라 다른 요소가 계속 컴퓨팅되지만 병렬 계산을 시작한 스레드는 모든 것이 완료 될 때까지 차단되므로 모든 것이 완료 될 때까지 액세스 할 수 없습니다.따라서 이터레이터의 첫 번째 요소가 오래 걸리면 마지막으로 계산되는 것을 멈추지 않지만 액세스가 중지됩니다. – Impredicative

+0

글쎄, 주요 관심사 중 하나는 메모리 사용량입니다. 그래서 실제로 마지막 요소를 계산하지 못하게하려고합니다. out-of-order 결과는 실제로 이런 종류의 "대기"를 덜 수있는 최적화입니다. –