2017-05-13 6 views
0

캐시/DB에 비즈니스 데이터가 있습니다. 우리는 매일 수 많은 로그 데이터를 처리하고 데이터 캐시/DB를 업데이트합니다. 이러한 업데이트 중 일부는 거의 실시간으로 발생하고 일부는 일괄 적으로 발생합니다. 우리는 많은 변화를 일으키는 일자리를 촉발 시켰습니다. 우리는 스파크 작업의 결과를 텍스트 파일에 저장하고 나중에 다른 순차 작업을 실행하여 캐시/DB에 저장합니다.memcache/reddis/DB 데이터를 스파크 작업에서 참조 또는 업데이트 할 수 있습니까?

나는 커넥터 (mongoDB-spark 커넥터, redis-spark 커넥터)를 사용하고 전체 데이터를 RDD로 가져 와서 처리하는 방법을 고려했습니다. 그러나 비즈니스 데이터의 크기는 로그 파일과 우리가 매일 수행하는 업데이트를 비교하면 엄청납니다. 그래서, 그것을 떨어 뜨렸다.

질문 :

  1. 우리가 캐시에 집행에서 직접 업데이 트를 할 수/DB 피하기가 마지막 단계 그래서?
  2. 더 나은 성능을위한 다른 제안이나 대안이 있습니까?
  3. 여기에 반 패턴이 보입니까? DB를 당신의 쓰기가 간단한 경우
+0

문제가 무엇인지 확실하지 않습니다. 예, Spark에서 직접 DB 또는 Java/scala 커넥터가있는 다른 요소에 쓸 수 있습니다. – summerbulb

+0

당신이 말한 것에 동의하십시오, 확실히 가능합니다. 하지만 모든 레코드 변환에 대한 연결 열기 및 닫기의 성능에 대해 궁금합니다. @ executor 수준의 연결을 공유하는 다른 더 좋은 방법이 있습니까? 연결 방송은 내가 추측하는 데 도움이되지 않습니다. – arunk2

답변

0

, 당신은 사용하여 DB에 바로 쓸 수 있습니다 :

myDF.write 
    .mode("overwrite") // Choose the mode you want from org.apache.spark.sql.SaveMode 
    .jdbc(url, "my_table", props) 

를 쿼리가 단순한 삽입 이상의 경우 (내가 on duplicate key update 섹션 질의를했다 예를 들어, 내 쿼리에서), 당신은 직접해야합니다.

mapPartitions()을 사용하면 파티션간에 쓰기를 분산시킬 수 있습니다.

myDF.mapPartitions(rows => { 
    val connection = DriverManager.getConnection(URL, properties) 

    rows.foreach(bulk => { 
     val statement = connection.prepareStatement(myQuery) 
     bulk.foreach(row => { 
     statement.setString(1, row.getString(0)) 
     statement.setInt(2, row.getInt(1)) 
     ... 
     statement.addBatch() 
     }) 

     statement.executeLargeBatch().iterator 
    }) 
    rows 
}).count //An action here is required, to trigger the mapPartitions()