2017-12-20 44 views
0

큰 시스템 내에서 처리되는 파일의 진행을 모니터하고 지속시키는 유틸리티를 빌드 중입니다. 이 파일은 큰 "text"파일 인 .csv, .xls, .txt 등입니다.이 파일은 Kafka에서 스트리밍 데이터로, Avro에 쓰거나 SQL DB로 대량 배치로 작성할 수 있습니다. 나는 처리 된 행의 수를 기록하고 RESTful API 호출을 사용하여 DB에 진행을 지속하는 "catchall"유틸리티를 빌드하려고합니다.Future가 while 루프에서 호출 될 때마다 호출되지 않습니다.

처리는 항상 처리 유형에 관계없이 Akka Actor 내에서 수행됩니다. 처리 진행을 막지 않도록 비동기 적으로 로깅 진행을 시도하고 있습니다. 진행은 매우 빠르게 진행됩니다.

//inside my processing actor 

    var fileIsProcessing = true 
    val allLines = KafkaUtil.getConnect(fileKey) 
    val totalLines = KafkaUtil.getSize 
    val batchSize = 500 
    val dBUtil = new DBUtil(totalLines) 

while (fileIsProcessing) { 

    // consumes @ 500 lines at a time to process, returns empty if done consuming 
    val batch:List[Pollable] = allLines.poll 
    //for batch identification purposes 
    val myMax = batch.map(_.toInt).max 
    println("Starting new batch with max line: " + myMax) 

    //processing work happens here 
    batch.map(processSync) 
    println("Finished processing batch with max line: " + myMax) 

    //send a progress update to be persisted to the DB 
    val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)} 
    progressCall.onComplete{ 
      case Success(s) => // don't care 
      case Failure(e) => logger.error("Unable to persist progress from actor ") 
    } 

if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional. 
} 

그리고, 간단한 표현 : 때때로 하나 점진적으로 하나를 이동 않지만 그것의 대부분은 비슷한 배치 스타일의 형식에서 일어나는, 여기에 기본 단지 데모 처리에 무슨 일이 일어날 지의 표현입니다 라인 Future[Unit] { dBUtil.incrementProgress(batch.size)} 안정적으로 매번 호출되지 않는, 너무 빨리

class DBUtil(totalLines:Int) { 

    //store both the number processed and the total to process in db, even if there is currently a percentage 

var rate = 0 //lines per second 
var totalFinished = 0 
var percentageFin:Double = 0 
var lastUpdate = DateTime.now() 

def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = { 
    //simulate write the data and calculated progress percentage to db 
    rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000) 
    totalFinished += totalProcessed 
    percentageFin = (totalFinished.toDouble/totalLines.toDouble) * 100 
    println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate") 
} 

}

지금, 생산에, 정말 이상한 것은, 처리가 발생 : 내 DBUtil의 클래스는 처리를하고 . while 루프가 완료되지만 내 DB에서 진행률이 50 % 또는 80 %로 줄 것입니다. 그것이 작동하는 유일한 방법은 시스템 속도를 늦추려면 logger 또는 println 명령문을 사용하여 시스템을 작동하지 못하게하는 것입니다.

미래 전화가 매번 전화를 걸지 않는 이유는 무엇입니까?

+1

'DBUtil'에 동기화가없는 의사 코드가 표시됩니다. 예외를 던져 상상도 못할 것이라고 상상하기 쉽습니다. –

+0

필자는 오류를 확실히 확인하고 적절한 곳에 '시도하기'가 사용되며 Future는 종종 .onComplete {case Success => ... case Failure => ...}'위의 의사 코드는 가능한 한 제거됩니다 추상화의 목적으로. 어떤 이유로 든 미래 전화가 "건너 뛴다"는 이유가 있을지 궁금합니다. – NateH06

답변

1

은 음 ... 그래서 당신이 가지고있는 코드,

당신은 당신의 while 루프의 미래를 시작하고 다음 루프 미래가 완료 될 때까지 기다리지 않고 다음 반복 진행에 몇 가지 문제가있다. 즉, 선물이 실제로 집행자에 의해 실행되기 전에 프로그램이 종료 될 수 있음을 의미합니다.

또한 루프가 더 많은 "미래 지향적"인 호출을 dBUtil.incrementProgress(batch.size)으로 만들고 있습니다. 동일한 기능을 동시에 실행하는 여러 스레드가 있습니다. 이것은 가변 상태를 사용하는 경주 조건을 유발합니다.

def processFileWithIncrementalUpdates(
    allLines: ????, 
    totalLines: Int, 
    batchSize: Int, 
    dbUtil: DBUtil 
): Future[Unit] = { 
    val promise = Promise[Unit]() 
    Future { 
    val batch: List[Pollable] = allLines.poll 
    if (batch.isEmpty) { 
     promise.completeWith(Future.successful[Unit]()) 
    } 
    else { 
     val myMax = batch.map(_.toInt).max 
     println("Starting new batch with max line: " + myMax) 

     //processing work happens here 
     batch.map(processSync) 
     println("Finished processing batch with max line: " + myMax) 

     //send a progress update to be persisted to the DB 
     val progressCall = Future[Unit] { dBUtil.incrementProgress(batch.size) } 

     progressCall.onComplete{ 
     case Success(s) => // don't care 
     case Failure(e) => logger.error("Unable to persist progress from actor ") 
     } 

     progressCall.onComplete({ 
     case _ => promise.completeWith(processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil)) 
     }) 
    } 
    promise.future 
    } 
} 

val allLines = KafkaUtil.getConnect(fileKey) 
val totalLines = KafkaUtil.getSize 
val batchSize = 500 
val dBUtil = new DBUtil(totalLines) 

val processingFuture = processFileWithIncrementalUpdates(allLines, totalLines, batchSize, dBUtil) 
+0

> 당신은 당신의 while 루프에서 미래를 시작할뿐입니다. 그리고 당신의 루프는 미래가 끝나기를 기다리지 않고 다음 반복을 위해갑니다. 어느 것이> 미래가 실제로 집행자에 의해 집행되기 전에 프로그램이 끝날 수 있음을 의미합니다. 이것은 정확히 내가 원하는 것입니다. 그러나 미래에 대한 호출은 실제로 "잃어 버렸"습니까? 나는 모든 루프가 끝날 것이라고 생각하지만 다시 돌아가서 모든 미래의 전화를 "따라 잡으려고"노력할 것입니다. 나는 "연기와 잊음"과 그 순서를 유지하는 큐에 대해 액터에서 전화를하는 것을 정말로지지했지만, 오버 헤드로 인해 거절당했습니다. – NateH06

+0

큰 따옴표의 서식에 대해 죄송합니다. 충분히 빠르게 수정하지 않았습니다. – NateH06

+0

주된 문제는 당신이'incrementProgress' 함수를 호출하는 다른 스레드들 사이에서 가변적 인 상태를 공유했다는 것입니다. –