내 응용 프로그램에서는 여러 HDFS 노드의 데이터를 가져 오는 스레드가 여러 개 있어야합니다. 이를 위해 나는 스레드 실 행자 풀과 포크 스레드를 사용하고 있습니다. 에서 포크 (fork) :스칼라의 스레드 실행자 풀 대체
val pathSuffixList = fileStatuses.getOrElse("FileStatus", List[Any]()).asInstanceOf[List[Map[String, Any]]]
pathSuffixList.foreach(block => {
ConsumptionExecutor.execute(new Consumption(webHdfsUri,block))
})
내 수준의 소비 :
class Consumption(webHdfsUri: String, block:Map[String,Any]) extends Runnable {
override def run(): Unit = {
val uriSplit = webHdfsUri.split("\\?")
val fileOpenUri = uriSplit(0) + "/" + block.getOrElse("pathSuffix", "").toString + "?op=OPEN"
val inputStream = new URL(fileOpenUri).openStream()
val datumReader = new GenericDatumReader[Void]()
val dataStreamReader = new DataFileStream(inputStream, datumReader)
// val schema = dataStreamReader.getSchema()
val dataIterator = dataStreamReader.iterator()
while (dataIterator.hasNext) {
println(" data : " + dataStreamReader.next())
}
}
}
ConsumptionExecutor : 나는에 어디 Akka 스트리밍/Akka 배우를 사용하려면 그러나
object ConsumptionExecutor{
val counter: AtomicLong = new AtomicLong()
val executionContext: ExecutorService = Executors.newCachedThreadPool(new ThreadFactory {
def newThread(r: Runnable): Thread = {
val thread: Thread = new Thread(r)
thread.setName("ConsumptionExecutor-" + counter.incrementAndGet())
thread
}
})
executionContext.asInstanceOf[ThreadPoolExecutor].setMaximumPoolSize(200)
def execute(trigger: Runnable) {
executionContext.execute(trigger)
}
}
giv 할 필요가 없다. 고정 된 스레드 풀 크기와 Akka가 모든 것을 처리합니다. 저는 Akka와 스트리밍 및 배우의 개념에 새로운 경험이 있습니다. 다른 사람이 내 유스 케이스에 맞게 샘플 코드 형태로 어떤 단서를 줄 수 있습니까? 미리 감사드립니다.