2016-09-02 12 views
3

큰 리턴 값에 대해서는 mclapply에서 실행 중이던 버그를보고하려고했습니다.병렬 함수로 인한 오버 헤드 최소화 R

은 분명히 버그 개발 버전에서 수정되었습니다,하지만 난 응답자가 만든 그 의견에 더 관심이 있어요 :

가 직렬화 된 객체의 크기에 2GB의 제한이 있던 예를 들어, mclapply는 분기 된 프로세스에서 리턴 할 수 있으며이 예제는 16GB를 시도합니다. 그가 해제 된 R-의 devel (64 비트에 대한 것은 빌드)하지만, 이러한 사용은

하는 경우 (예 때문에 (UN) 직렬화에 관련된 모든 사본의 150GB ca를 필요로) 매우 독특하고 오히려 비효율적이다 큰 데이터로 병렬 계산을하기 위해 mclapply를 사용하면 비효율적입니다. 그러면 더 좋은 방법은 무엇일까요? 이런 종류의 일을해야하는 나의 필요성은 점점 커지고 있으며 나는 모든 곳에서 병목 현상을 확실히 겪고 있습니다. 필자가 본 튜토리얼은 기능을 사용하는 방법에 대해 아주 기본적인 소개를했지만 트레이드 오프 관리에 효과적으로 기능을 사용하는 방법은 아닙니다.

mc.preschedule : 문서이 트레이드 오프에 작은 선전을 가지고 'TRUE'다음 계산이 제 많은 작업이 코어는 다음이다 (최대)로 나뉘어져로 설정하면 작업이 시작되고 각 작업이 두 개 이상의 값을 포함 할 수 있습니다. 'FALSE'로 설정하면 각각의 값에 대해 하나의 작업이 분기됩니다. 전자는 짧은 계산에 더 좋거나 'X'에 많은 수의 값이 있으며, 후자는 완료 시간의 변동이 크고 의 'mc.cores'에 비해 값이 많지 않은 의 작업에 더 좋습니다

코어 (현재 값이 확산되고이 있기 때문에

기본적으로

는 ('mc.preschedule = TRUE') 입력 'X'는 만큼의 부분으로 분할되는 코어를 순차적으로 가로 질러 , 즉 첫 번째 값을 코어 1로, 두 번째 값을 에서 코어 2로 , ... (코어 + 1) 값을 코어 1 등)로 변환 한 다음 하나의 프로세스를 각 코어로 분기하고 결과를 수집합니다.

미리 스케줄을 지정하지 않으면 'X'값마다 별도의 작업이 분기됩니다. 더 그 수는 마스터 프로세스를 포크 된 후 'mc.cores'작업이 한 번 에서 실행하면 다음 포크 신뢰성이 소요 이러한 것들을 벤치마킹

전에 완료하는 아이를 위해 을 대기보다 더하지 않은지 확인하려면 몇 가지 문제가 규모 대로만 나타 났으므로 많은 시간이 소요됩니다. 그런 다음 어떤 일이 벌어지고 있는지 파악하기가 어렵습니다. 따라서 함수의 동작에 대한 더 나은 통찰력이 도움이 될 것입니다.

편집 : 나는 mclapply 많이 사용하고 더 나은 성능에 미치는 영향에 대해 생각하는 방법을 알고 싶었 기 때문에

나는, 구체적인 예를 가지고 있지 않습니다. 그리고 디스크에 쓰는 동안 오류가 발생하지만 필자는 디스크 직렬화를 통해 발생해야하는 (비) 직렬화에 도움이 될 것이라고 생각하지 않습니다.M 자체가 메모리에 적합하지 않기 때문에 큰 희소 행렬 M을 가지고, 그리고 (M1-M100 말) 덩어리 디스크에 기록 :

한 워크 플로는 다음과 같이 될 것이다.

는 지금은 사용자 수준에서 추가 집계 할 M에서 Ci 열이있는 I의 각 사용자 i에 대해 말한다. 작은 데이터를이 비교적 단순 것이다 :

m = matrix(runif(25), ncol=5) 
df = data.frame(I=sample(1:6, 20, replace=T), C=sample(1:5, 20, replace=T)) 
somefun = function(m) rowSums(m) 
res = sapply(sort(unique(df$I)), function(i) somefun(m[,df[df$I == i,]$C])) 

하지만 큰 데이터

내 접근법/다른 data.frames에 열이 열이있을 것이다 행렬 M1-M100에 기초하여 사용자의 data.frame 분할했다 그 data.frames에 대해 병렬 루프를 수행하고, 관련 매트릭스를 읽고, 사용자를 반복하고, 열을 추출하고, 함수를 적용한 다음 출력 목록을 가져 와서 다시 반복하고 다시 집계합니다.

이렇게 재구성 될 수없는 함수가있는 경우 (현재로서는 걱정할 필요가 없습니다.)이 방법으로 너무 많은 데이터를 뒤섞고있는 것은 이상적이지 않습니다.

+1

문제의 구조를 알지 못하면 어떻게하면 더 잘할 수 있는지 알 수 없습니다. 버그 리포트에서 예제를 살펴 보았습니다. 아마도 큰 오브젝트를 리턴 할 수 없다는 것을 보여줄 것입니다. 공유 메모리 패키지를 살펴볼 수도 있습니다.문제의 구조를 포착하는 간단한 재현 가능한 예제를 게시해야하지만 이해하기에 충분히 간단합니다. 나는'mc.preschedule'과 같은 작업 스케줄링이 당신을 어디든지 데려 올 것이라고 생각하지 않는다. – cryo111

+1

나는 내가하고있는 일에 대한 간단한 예제를 게시했지만, 디스크 IO, 직렬화 및 이러한 문제로 인해 발생하는 모든 문제가 복잡합니다. 내가 뭘 하려는지 분명히하고 있니? – James

답변

0

중간 크기 N에 대한 오버 헤드를 제한하려면 mc.preschedule = TRUE을 사용하는 것이 좋습니다 (즉, 코어가있는만큼 많은 덩어리로 작업 분할).

메모리 사용과 CPU 간의 주요 단점이있는 것 같습니다. 즉, 진행중인 프로세스가 RAM을 최대한 활용할 때까지만 병렬화 할 수 있습니다. 한 가지 고려해야 할 점은 여러 작업자가 중복없이 R 세션에서 동일한 개체를 읽을 수 있다는 것입니다. 따라서 병렬 함수 호출에서 수정/생성 된 객체 만 각 코어에 대해 메모리 풋 프린트가 추가됩니다.

메모리를 최대로 사용한다면 전체 계산을 여러 개의 하위 작업으로 나눠서 순차적으로 루프를 돌리고 (예 : lapply로) 루프 내에서 mclapply를 호출하여 각 하위 작업을 병렬 처리하는 것이 좋습니다. 하위 작업의 출력을 디스크에 저장하여 모든 것을 메모리에 보관하지 않도록합니다.

2

내 대답은 너무 늦지 않았 으면 좋겠지 만, 귀하의 예제는 bigmemory 패키지를 통해 공유 메모리/파일을 사용하여 처리 할 수 ​​있다고 생각합니다.

은의 당신이 실행하려는 기능으로 이동하자의 지금 데이터

library(bigmemory) 
library(parallel) 

#your large file-backed matrix (all values initialized to 0) 
#it can hold more than your RAM as it is written to a file 
m=filebacked.big.matrix(nrow=5, 
         ncol=5, 
         type="double", 
         descriptorfile="file_backed_matrix.desc", 
         backingfile="file_backed_matrix", 
         backingpath="~") 

#be careful how to fill the large matrix with data 
set.seed(1234) 
m[]=c(matrix(runif(25), ncol=5)) 
#print the data to the console 
m[] 

#your user-col mapping 
#I have added a unique idx that will be used below 
df = data.frame(unique_idx=1:20, 
       I=sample(1:6, 20, replace=T), 
       C=sample(1:5, 20, replace=T)) 

#the file-backed matrix that will hold the results 
resm=filebacked.big.matrix(nrow=nrow(df), 
          ncol=2, 
          type="double",init = NA_real_, 
          descriptorfile="res_matrix.desc", 
          backingfile="res_backed_matrix", 
          backingpath="~") 

#the first column of resm will hold the unique idx of df 
resm[,1]=df$unique_idx 
resm[] 

을 만들어 보자. rowSums을 작성했지만 귀하의 텍스트에서 당신이 의미 한 바를 colSums으로 추측했습니다. 나는 그것을 적절하게 바꿨다.

somefun = function(x) { 
    #attach the file-backed big.matrix 
    #it makes the matrix "known" to the R process (no copying involved!) 
    #input 
    tmp=attach.big.matrix("~/file_backed_matrix.desc") 
    #output 
    tmp_out=attach.big.matrix("~/res_matrix.desc") 

    #store the output in the file-backed matrix resm 
    tmp_out[x$unique_idx,2]=c(colSums(tmp[,x$C,drop=FALSE])) 
    #return a little more than the colSum result 
    list(pid=Sys.getpid(), 
     user=x$I[1], 
     col_idx=x$C) 
} 

모든 코어의 병렬 계산을 수행

#perform colSums using different threads 
res=mclapply(split(df,df$I),somefun,mc.cores = detectCores()) 

점검

#processes IDs 
unname(sapply(res,function(x) x$pid)) 
#28231 28232 28233 28234 28231 28232 

#users 
unname(sapply(res,function(x) x$user)) 
#1 2 3 4 5 6 

#column indexes 
identical(sort(unname(unlist(sapply(res,function(x) x$col_idx)))),sort(df$C)) 
#[1] TRUE 

#check result of colSums 
identical(lapply(split(df,df$I),function(x) resm[x$unique_idx,2]), 
      lapply(split(df,df$I),function(x) colSums(m[,x$C,drop=FALSE]))) 
#[1] TRUE 

편집 결과 : 내 편집에 당신의 코멘트를 해결했다. 파일 지원 출력 매트릭스 resm에 결과를 저장하면 예상대로 작동합니다.

+0

Rdsm과 bigmemory 패키지를 사용하여 각 프로세스의 결과를 행렬의 열에 써서 정확하게 처리하려고했습니다. 각 프로세스가 한 번만 열에 쓸 수 있도록 설정했습니다. 그러나 결과는 작은 장난감 예제에서 동일하지 않게되었으므로 공유 메모리를 통해 동일한 행렬에 서로 다른 열을 쓰는 것이 스레드로부터 안전한 작업이 아닌 것처럼 보였습니다. 내 코드에 버그가 있었을 수도 있습니다. 나중에 그 시도의 글을 올리려고합니다. 감사! – James

+0

@James 제 편집문에 귀하의 의견이 반영되었습니다. 내 대답은 이제 공유 매트릭스에 출력도 포함됩니다. 코드는 예상대로 작동합니다. – cryo111