2012-02-29 3 views
0

Scala 2.9/Akka 2.0 RC2 코드에서 동시성과 성능을 향상시킬 수있는 기회를 찾고 있습니다. 다음 코드를 감안할 때 : 나는 실행하면다음 Scala + Akka 코드에서 성능/동시성을 향상시킬 수있는 기회는 무엇입니까?

import akka.actor._ 

case class DataDelivery(data:Double) 

class ComputeActor extends Actor { 
    var buffer = scala.collection.mutable.ArrayBuffer[Double]() 

    val functionsToCompute = List("f1","f2","f3","f4","f5") 
    var functionMap = scala.collection.mutable.LinkedHashMap[String,(Map[String,Any]) => Double]() 
    functionMap += {"f1" -> f1} 
    functionMap += {"f2" -> f2} 
    functionMap += {"f3" -> f3} 
    functionMap += {"f4" -> f4} 
    functionMap += {"f5" -> f5} 

    def updateData(data:Double):scala.collection.mutable.ArrayBuffer[Double] = { 
     buffer += data 
     buffer 
    } 

    def f1(map:Map[String,Any]):Double = { 
// println("hello from f1") 
     0.0 
    } 

    def f2(map:Map[String,Any]):Double = { 
// println("hello from f2") 
     0.0 
    } 

    def f3(map:Map[String,Any]):Double = { 
// println("hello from f3") 
     0.0 
    } 

    def f4(map:Map[String,Any]):Double = { 
// println("hello from f4") 
     0.0 
    } 

    def f5(map:Map[String,Any]):Double = { 
// println("hello from f5") 
     0.0 
    } 

    def computeValues(immutableBuffer:IndexedSeq[Double]):Map[String,Double] = { 
     var map = Map[String,Double]() 
     try { 
      functionsToCompute.foreach(function => { 
       val value = functionMap(function) 
       function match { 
        case "f1" => 
         var v = value(Map("lookback"->10,"buffer"->immutableBuffer,"parm1"->0.0)) 
         map += {function -> v} 
        case "f2" => 
         var v = value(Map("lookback"->20,"buffer"->immutableBuffer)) 
         map += {function -> v} 
        case "f3" => 
         var v = value(Map("lookback"->30,"buffer"->immutableBuffer,"parm1"->1.0,"parm2"->false)) 
         map += {function -> v} 
        case "f4" => 
         var v = value(Map("lookback"->40,"buffer"->immutableBuffer)) 
         map += {function -> v} 
        case "f5" => 
         var v = value(Map("buffer"->immutableBuffer)) 
         map += {function -> v} 
        case _ => 
         println(this.unhandled()) 
       } 
      }) 
     } catch { 
      case ex: Exception => 
       ex.printStackTrace() 
     } 
     map 
    } 

    def receive = { 
     case DataDelivery(data) => 
     val startTime = System.nanoTime()/1000 
     val answers = computeValues(updateData(data)) 
     val endTime = System.nanoTime()/1000 
     val elapsedTime = endTime - startTime 
     println("elapsed time is " + elapsedTime) 
     // reply or forward 
     case msg => 
     println("msg is " + msg) 
    } 

} 

object Test { 
    def main(args:Array[String]) { 
     val system = ActorSystem("actorSystem") 
     val computeActor = system.actorOf(Props(new ComputeActor),"computeActor") 
     var i = 0 
     while (i < 1000) { 
      computeActor ! DataDelivery(i.toDouble) 
      i += 1 
     } 
    } 
} 

를이 (마이크로로 변환) 출력은

elapsed time is 4898 
elapsed time is 184 
elapsed time is 144 
    . 
    . 
    . 
elapsed time is 109 
elapsed time is 103 

당신은 JVM의 증가 컴파일러에서 발로 볼 수있다 생각

이 빠른 하나. 승리는

functionsToCompute.foreach(function => { 

을 변경할 수 있습니다

functionsToCompute.par.foreach(function => { 

는하지만, 이것은 다음과 같은 경과 시간에 결과

elapsed time is 31689 
elapsed time is 4874 
elapsed time is 622 
    . 
    . 
    . 
elapsed time is 698 
elapsed time is 2171 

일부 정보 :

1) 나는 2 개 코어 맥북 프로에서이 작업을 실행하고 있습니다.

2) 전체 버전에서 함수는 변경 가능한 공유 버퍼의 일부를 루프하는 장기 실행 연산입니다. 액터의 사서함에서 메시지를 가져 오는 것이 흐름을 제어하기 때문에 문제가되지는 않지만 증가 된 동시성으로 인해 문제가 될 수 있습니다. 이것이 내가 IndexedSeq로 변환 한 이유입니다. functionMap에서 모든 항목이 반드시 호출되도록 전체 버전에서

3 은)는 functionsToCompute 목록이 다를 수 있습니다 (예) functionMap.size는 functionsToCompute.size

4) 기능보다 훨씬 클 수 있습니다 병렬로 계산 될 수 있지만, 결과지도

몇 가지 질문을 반환하기 전에 완료해야합니다 :

1) 나는 빠른 병렬 버전 실행을 위해 무엇을 할 수 있는가?

2) 비 차단 및 차단 선물을 추가하는 것이 어디에서 의미가 있습니까?

3) 계산을 다른 배우에게 전달하는 것이 어디에서 의미가 있습니까?

4) 불변성/안전성을 높이기위한 몇 가지 기회가 있습니까?

감사합니다, 요청에 따라 브루스

+1

나는 당신이 당신의 대답으로 무엇을하고 있는지 확신하지 못한다. 그러나 그것은 나에게 흥미로운 점이다. 여기서 akka.dispatch.Future.sequence를 잘 활용할 수있는 것 같습니다. 계산을 수행하고있는 Future의 목록을 만들어 결과 목록에서 Future로 변환합니다. 그 미래가 돌아 오면 필요한 집계표/목록/컨테이너로 결과를 접으십시오. –

+0

@DerekWyatt. 정식 버전에서는 답변이 다른 배우에게 전달됩니다. 귀하의 의견에 감사드립니다. –

+0

왜 각 기능에 대해 하나의 액터가 없습니까? –

답변

2

이 (지연 미안 나는 정도에 통지가없는 ...) 예를 제공.

Akka 설명서 Section on 'Composing Futures'에 좋은 예가 있지만 상황에 맞게 조금 더 알려 드리겠습니다.

이제 이것을 읽은 후 Akka 웹 사이트의 튜토리얼 및 문서를 읽으십시오. 해당 문서가 제공 할 핵심 정보가 많이 누락되었습니다.

import akka.dispatch.{Await, Future, ExecutionContext} 
import akka.util.duration._ 
import java.util.concurrent.Executors 

object Main { 
    // This just makes the example work. You probably have enough context 
    // set up already to not need these next two lines 
    val pool = Executors.newCachedThreadPool() 
    implicit val ec = ExecutionContext.fromExecutorService(pool) 

    // I'm simulating your function. It just has to return a tuple, I believe 
    // with a String and a Double 
    def theFunction(s: String, d: Double) = (s, d) 
    def main(args: Array[String]) { 
    // Here we run your functions - I'm just doing a thousand of them 
    // for fun. You do what yo need to do 
    val listOfFutures = (1 to 1000) map { i => 
     // Run them in parallel in the future 
     Future { 
     theFunction(i.toString, i.toDouble) 
     } 
    } 
    // These lines can be composed better, but breaking them up should 
    // be more illustrative. 
    // 
    // Turn the list of Futures (i.e. Seq[Future[(String, Double)]]) into a 
    // Future with a sequence of results (i.e. Future[Seq[(String, Double)]]) 
    val futureOfResults = Future.sequence(listOfFutures) 

    // Convert that future into another future that contains a map instead 
    // instead of a sequence 
    val intermediate = futureOfResults map { _.toList.toMap } 

    // Wait for it complete. Ideally you don't do this. Continue to 
    // transform the future into other forms or use pipeTo() to get it to go 
    // as a result to some other Actor. "Await" is really just evil... the 
    // only place you should really use it is in silly programs like this or 
    // some other special purpose app. 
    val resultingMap = Await.result(intermediate, 1 second) 
    println(resultingMap) 

    // Again, just to make the example work 
    pool.shutdown() 
    } 
} 

이 실행을 얻을 수 있도록 클래스 경로에 필요한 것은 akka-actor 항아리입니다. Akka 웹 사이트는 필요한 것을 설정하는 방법을 알려주지 만, 실제로는 간단합니다.

+0

도움을 주셔서 감사합니다. 이것은 훌륭하고 내 질문의 대부분을 다룹니다. –

+0

... 그렇습니다. 필자는 확실히 문서를 읽을 것입니다. –

+0

이것은 꽤 멋진 것들입니다. 나는 Await를 onComplete와 바꾸었고 멋지게 작동했습니다. 다음에 pipeTo()를 시도해 보겠습니다. –