2015-01-21 7 views
0

내 응용 프로그램에서는 여러 HDFS 노드의 데이터를 가져 오는 스레드가 여러 개 있어야합니다. 이를 위해 나는 스레드 실 행자 풀과 포크 스레드를 사용하고 있습니다. 에서 포크 (fork) :스칼라의 스레드 실행자 풀 대체

val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]] 
    pathSuffixList.foreach(block => { 
    ConsumptionExecutor.execute(new Consumption(webHdfsUri,block)) 
    }) 

내 수준의 소비 :

class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable { 

     override def run(): Unit = { 
     val uriSplit = webHdfsUri.split("\\?") 
     val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN" 
     val inputStream = new URL(fileOpenUri).openStream() 
     val datumReader = new GenericDatumReader[Void]() 
     val dataStreamReader = new DataFileStream(inputStream, datumReader) 
     //  val schema = dataStreamReader.getSchema() 
     val dataIterator = dataStreamReader.iterator() 
     while (dataIterator.hasNext) { 
      println(" data : " + dataStreamReader.next()) 
     } 
     } 

    } 

ConsumptionExecutor : 나는에 어디 Akka 스트리밍/Akka 배우를 사용하려면 그러나

object ConsumptionExecutor{ 

    val counter: AtomicLong = new AtomicLong() 

    val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory { 
    def newThread(r: Runnable): Thread = { 
     val thread: Thread = new Thread(r) 
     thread.setName("ConsumptionExecutor-" + counter.incrementAndGet()) 
     thread 
    } 
    }) 
    executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200) 

    def execute(trigger: Runnable) { 
    executionContext.execute(trigger) 
    } 

} 

giv 할 필요가 없다. 고정 된 스레드 풀 크기와 Akka가 모든 것을 처리합니다. 저는 Akka와 스트리밍 및 배우의 개념에 새로운 경험이 있습니다. 다른 사람이 내 유스 케이스에 맞게 샘플 코드 형태로 어떤 단서를 줄 수 있습니까? 미리 감사드립니다.

답변

1

아이디어는 FlowGraph에 여러 Source의에서 다음 Merge 그들 당신이 읽는 각 HDFS 노드 ActorPublisher의 (서브 클래스) 인스턴스를 생성, 그리고 것입니다.

ActorPublisher 소스의 세부 밖으로 남아있는이 의사 코드 같은

뭔가 : 이것은 그냥 반복과에 가장자리를 추가하여 배우 소스의 수집을 위해 개선 할 수

val g = PartialFlowGraph { implicit b => 
    import FlowGraphImplicits._ 
    val in1 = actorSource1 
    val in2 = actorSource2 
    // etc. 

    val out = UndefinedSink[T] 
    val merge = Merge[T] 

    in1 ~> merge ~> out 
    in2 ~> merge 
    // etc. 
} 

각각에 대해 merge이지만 아이디어가 있습니다.