큰 시스템 내에서 처리되는 파일의 진행을 모니터하고 지속시키는 유틸리티를 빌드 중입니다. 이 파일은 큰 "text"파일 인 .csv, .xls, .txt 등입니다.이 파일은 Kafka에서 스트리밍 데이터로, Avro에 쓰거나 SQL DB로 대량 배치로 작성할 수 있습니다. 나는 처리 된 행의 수를 기록하고 RESTful API 호출을 사용하여 DB에 진행을 지속하는 "catchall"유틸리티를 빌드하려고합니다.Future가 while 루프에서 호출 될 때마다 호출되지 않습니다.
처리는 항상 처리 유형에 관계없이 Akka Actor 내에서 수행됩니다. 처리 진행을 막지 않도록 비동기 적으로 로깅 진행을 시도하고 있습니다. 진행은 매우 빠르게 진행됩니다.
//inside my processing actor
var fileIsProcessing = true
val allLines = KafkaUtil.getConnect(fileKey)
val totalLines = KafkaUtil.getSize
val batchSize = 500
val dBUtil = new DBUtil(totalLines)
while (fileIsProcessing) {
// consumes @ 500 lines at a time to process, returns empty if done consuming
val batch:List[Pollable] = allLines.poll
//for batch identification purposes
val myMax = batch.map(_.toInt).max
println("Starting new batch with max line: " + myMax)
//processing work happens here
batch.map(processSync)
println("Finished processing batch with max line: " + myMax)
//send a progress update to be persisted to the DB
val progressCall = Future[Unit] {dBUtil.incrementProgress(batch.size)}
progressCall.onComplete{
case Success(s) => // don't care
case Failure(e) => logger.error("Unable to persist progress from actor ")
}
if (batch.isEmpty) fileIsProcessing = false //this is horribly non-functional.
}
그리고, 간단한 표현 : 때때로 하나 점진적으로 하나를 이동 않지만 그것의 대부분은 비슷한 배치 스타일의 형식에서 일어나는, 여기에 기본 단지 데모 처리에 무슨 일이 일어날 지의 표현입니다 라인 Future[Unit] { dBUtil.incrementProgress(batch.size)}
안정적으로 매번 호출되지 않는, 너무 빨리
class DBUtil(totalLines:Int) {
//store both the number processed and the total to process in db, even if there is currently a percentage
var rate = 0 //lines per second
var totalFinished = 0
var percentageFin:Double = 0
var lastUpdate = DateTime.now()
def incrementProgress(totalProcessed: Int, currentTime:DateTime): Unit = {
//simulate write the data and calculated progress percentage to db
rate = totalProcessed/((currentTime.getMillis() - lastUpdate.getMillis())/1000)
totalFinished += totalProcessed
percentageFin = (totalFinished.toDouble/totalLines.toDouble) * 100
println(s"Simulating DB persist of total processed:$totalFinished lines at $percentageFin% from my total lines: $totalLines at rate:$rate")
}
}
지금, 생산에, 정말 이상한 것은, 처리가 발생 : 내 DBUtil의 클래스는 처리를하고 . while
루프가 완료되지만 내 DB에서 진행률이 50 % 또는 80 %로 줄 것입니다. 그것이 작동하는 유일한 방법은 시스템 속도를 늦추려면 logger
또는 println
명령문을 사용하여 시스템을 작동하지 못하게하는 것입니다.
미래 전화가 매번 전화를 걸지 않는 이유는 무엇입니까?
'DBUtil'에 동기화가없는 의사 코드가 표시됩니다. 예외를 던져 상상도 못할 것이라고 상상하기 쉽습니다. –
필자는 오류를 확실히 확인하고 적절한 곳에 '시도하기'가 사용되며 Future는 종종 .onComplete {case Success => ... case Failure => ...}'위의 의사 코드는 가능한 한 제거됩니다 추상화의 목적으로. 어떤 이유로 든 미래 전화가 "건너 뛴다"는 이유가 있을지 궁금합니다. – NateH06