2014-11-14 2 views
0

나는 자정에 매일 작업을 실행하는 Grails 애플리케이션을 가지고있다. grails와 gpar로 엄청난 양의 데이터 처리하기

package threading 

import static grails.async.Promises.task 
import static groovyx.gpars.GParsExecutorsPool.withPool 

class ComplexJob { 
    static triggers = { 
     simple repeatInterval: 30 * 1000l 
    } 

    def execute() { 
     if (Person.count == 5000) { 
      println "Executing job"     
      withPool 10000, { 
       Person.listOrderByAge(order: "asc").each { p -> 
        task { 
         log.info "Started ${p}" 
         Thread.sleep(15000l - (-1 * p.age)) 
        }.onComplete { 
         log.info "Completed ${p}" 
        } 
       } 
      }     
     } 
    } 
} 

이 같은 repeatInterval 테스트 목적만을위한 것입니다 무시 : 내 예를 들어, 응용 프로그램에서 나는 석영 작업에서 다음 만 개 Person 기록을 가지고 않습니다. 작업이 나는 다음과 같은 예외를 얻을 실행됩니다 경우 :

2014-11-14 16:11:51,880 quartzScheduler_Worker-3 grails.plugins.quartz.listeners.ExceptionPrinterJobListener - Exception occurred in job: Grails Job 
org.quartz.JobExecutionException: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 [See nested exception: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000] 
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:111) 
    at org.quartz.core.JobRunShell.run(JobRunShell.java:202) 
    at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) 
Caused by: java.lang.IllegalStateException: The thread pool executor cannot run the task. The upper limit of the thread pool size has probably been reached. Current pool size: 1000 Maximum pool size: 1000 
    at org.grails.async.factory.gpars.LoggingPoolFactory$3.rejectedExecution(LoggingPoolFactory.groovy:100) 
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) 
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) 
    at groovyx.gpars.scheduler.DefaultPool.execute(DefaultPool.java:155) 
    at groovyx.gpars.group.PGroup.task(PGroup.java:305) 
    at groovyx.gpars.group.PGroup.task(PGroup.java:286) 
    at groovyx.gpars.dataflow.Dataflow.task(Dataflow.java:93) 
    at org.grails.async.factory.gpars.GparsPromise.<init>(GparsPromise.groovy:41) 
    at org.grails.async.factory.gpars.GparsPromiseFactory.createPromise(GparsPromiseFactory.groovy:68) 
    at grails.async.Promises.task(Promises.java:123) 
    at threading.ComplexJob$_execute_closure1_closure3.doCall(ComplexJob.groovy:20) 
    at threading.ComplexJob$_execute_closure1.doCall(ComplexJob.groovy:19) 
    at groovyx.gpars.GParsExecutorsPool$_withExistingPool_closure2.doCall(GParsExecutorsPool.groovy:192) 
    at groovyx.gpars.GParsExecutorsPool.withExistingPool(GParsExecutorsPool.groovy:191) 
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:162) 
    at groovyx.gpars.GParsExecutorsPool.withPool(GParsExecutorsPool.groovy:136) 
    at threading.ComplexJob.execute(ComplexJob.groovy:18) 
    at grails.plugins.quartz.GrailsJobFactory$GrailsJob.execute(GrailsJobFactory.java:104) 
    ... 2 more 
2014-11-14 16:12:06,756 Actor Thread 20 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368) 
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
2014-11-14 16:12:06,756 Actor Thread 5 org.grails.async.factory.gpars.LoggingPoolFactory - Async execution error: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
java.lang.IllegalStateException: A DataflowVariable can only be assigned once. Only re-assignments to an equal value are allowed. 
    at groovyx.gpars.dataflow.expression.DataflowExpression.bind(DataflowExpression.java:368) 
    at groovyx.gpars.group.PGroup$4.run(PGroup.java:315) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

내가 아마이 계산을 할 수 withPool(10000) 를 사용하는 동안 스레드 풀은 10000로 설정되어 있지 않은 것처럼 보인다 (이제 인쇄 문을 로그를) 청크? 그렇다면 어떻게 최신 항목이 처리되었는지 (예 : 계속할 위치)를 알 수 있습니까?

+5

작은 풀 (예외가 표시되면 1000)을 사용하지 않는 이유는 무엇입니까? 작업을 수행하는 데 10000 개의 스레드를 만드는 것이 순차적으로 수행하는 것보다 빠르지는 않습니다. –

+3

Grails 앱에서 Spring 배치와 같이 일괄 처리를 위해 실제로 설계된 것을 사용하지 않는 이유는 무엇입니까? 그것이 내가하는 일이며, 잘 작동합니다. –

+0

풀 크기가 클수록 처리 속도가 빠릅니다. 얼마 전 나는 100 개의 쓰레드를 사용했고 실제로 JVM 효율성에 큰 문제를 야기했습니다. 시행 착오 끝에 15 개의 쓰레드만으로 충분했다. – Opal

답변

0

각 요소의 처리를 랩으로 처리하는 것이 최적의 것으로 보이지 않습니다. 병렬 처리를하는 표준 방법은 전체 작업을 적절한 수의 하위 작업으로 나눕니다. 이 번호를 선택하는 것부터 시작합니다. CPU 바운드 작업의 경우 N = 프로세서 수 작업을 만들 수 있습니다. 그런 다음 N 개의 하위 작업으로 작업을 분할합니다. 이처럼 :

persons = Person.listOrderByAge(order: "asc") 
nThreads = Runtime.getRuntime().availableProcessors() 
size = persons.size()/nThreads 
withPool nThreads, { 
    persons.collate(size).each { subList => 
     task { 
      subList.each { p => 
       ...  
      } 
     }   
    } 
} 
1

나는 작업이 기본 스레드 풀, withPool에서 생성되지 않은 하나를 사용하여 대부분이기 때문에 withPool() 메소드는 효과가 없습니다 생각한다. withPool()에 대한 호출을 제거하고 작업이 여전히 실행되는지 확인하십시오.

GPars의 groovyx.gpars.scheduler.DefaultPool 풀 (작업의 기본값)은 작업의 크기를 조정하며 1000 개의 동시 스레드 수를 제한합니다.

내가 대신 예를 들어 고정 된 크기의 풀을 만드는 게 좋을 것 :

def group = new DefaultPGroup(numberOfThreads) 
group.task {...} 

참고 : 상황이 약간 다를 수 있습니다, 그래서 나는 단지 핵심 GPars의 사람을 grails.async 작업에 익숙하지 않은 해요 grails.async의 PGroups 주변.