0

좋은 아침. 첫 번째 단계는 실제 간단한 사용 사례입니다. 고객 + 점수.스파크 카산드라 커넥터 : SQLContext.read + SQLContext.write 대 수동 구문 분석 및 삽입 (JSON -> 카산드라) 난 그냥 아파치 스파크와 아파치 카산드라를 조사하기 시작</p> <p>

카산드라 테이블에는 고객이 PrimaryKey로 있습니다. 카산드라는 로컬에서만 실행됩니다 (클러스터가 전혀 없습니다!).

SparkJob (독립 실행 형 로컬 [2])이 JSON 파일을 구문 분석 한 다음 전체 내용을 Cassandra에 쓰고 있습니다.

먼저 용액

val conf = new SparkConf().setAppName("Application").setMaster("local[2]") 
val sc = new SparkContext(conf) 
val cass = CassandraConnector(conf) 

val customerScores = sc.textFile(file).cache() 

val customerScoreRDD = customerScores.mapPartitions(lines => { 
    val mapper = new ObjectMapper with ScalaObjectMapper 
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 
    mapper.registerModule(DefaultScalaModule) 
    lines 
    .map(line => { 
     mapper.readValue(line, classOf[CustomerScore]) 
    }) 
    //Filter corrupt ones: empty values 
    .filter(customerScore => customerScore.customer != null && customerScore.score != null) 
}) 


customerScoreRDD.foreachPartition(rows => cass.withSessionDo(session => { 
    val statement: PreparedStatement = session.prepare("INSERT INTO playground.customer_score (customer,score) VALUES (:customer,:score)") 
    rows.foreach(row => { 
    session.executeAsync(statement.bind(row.customer.asInstanceOf[Object], row.score)) 
    }) 
})) 

sc.stop() 

수동 모든 일 라인을 파싱하고 산드 수단에 삽입했다.

대략 10000000 레코드 (SparkContext 만들기 포함 ... 등)의 경우 약 714020 ms이 필요합니다.

는 다음 나는 스파크 카산드라 커넥터에 대해 읽고 다음 한 :

val conf = new SparkConf().setAppName("Application").setMaster("local[2]") 
val sc = new SparkContext(conf) 
var sql = new SQLContext(sc) 

val customerScores = sql.read.json(file) 

val customerScoresCorrected = customerScores 
    //Filter corrupt ones: empty values 
    .filter("customer is not null and score is not null") 
    //Filter corrupt ones: invalid properties 
    .select("customer", "score") 

customerScoresCorrected.write 
    .format("org.apache.spark.sql.cassandra") 
    .mode(SaveMode.Append) 
    .options(Map("keyspace" -> "playground", "table" -> "customer_score")) 
    .save() 

sc.stop() 

그래서 훨씬 간단 필요한 코드의 의미에서 주어진 API를 사용하여.

이 해결 방법은 약 10000000 개의 레코드에 대해 대략 1232871ms을 사용합니다 (다시 말하면 모두 동일한 측정 포인트).

(수동 구문 분석 플러스 1,530,877 MS 소요 saveToCassandra를 사용하여뿐만 아니라, 세 번째 솔루션을했다) 이제 내 질문 :

어떤 방법으로 수행 할 수있는 "올바른"방법입니다 이 유스 케이스는 요즘의 "최고의 실천"(그리고 실제로 시나리오에서는 캐스 산드라와 스파크를 모으는 것 중 가장 성능이 좋은 것)인가? 내 결과에서 SQLContext.read + SQLContext.write 대신 "수동"물건을 사용합니다.

미리 의견을 보내 주셔서 감사합니다.

+1

RDD를 작성할 때 ('saveToCassandra' 사용) Cassandra 커넥터를 사용하면 좋은 결과를 얻었습니다. DataFrames 대신 RDD를 사용하면 Cassandra 토큰 범위 ('repartitionByCassandraReplica' 사용)에 따라 다시 분할 할 수 있으므로 대부분의 쓰기가 지역적이므로 많은 Cassandra 코디네이터 작업을 피할 수 있습니다. – LiMuBei

+0

감사합니다. @LiMuBei, 이것은 실제로 saveToCassandra를 로컬 테스트 케이스에서도 조금 줄입니다. "수동"솔루션 (첫 번째 코드 스 니펫)이 여전히 가장 빠른 것 같습니다. – markush81

+1

첫 번째 솔루션에서 비동기 작업이 실제로 완료 될 때까지 기다리지 않아도 될까요? 이 방법은 모든 삽입 작업이 성공적으로 완료되었다고 보장 할 수 없습니다. – LiMuBei

답변

0

사실 이제는 오랜 시간이 지난 후에 다음을 고려해야합니다. 데이터 물론 금액의

  • 데이터의
  • 유형 : 파티션 키의 특히 다양한 (중복 많은 대 각각 다른)
  • 환경 : 스파크 집행 인, 카산드라 노드, 복제 ... 내 유스 케이스는

    def initSparkContext: SparkContext = { 
        val conf = new SparkConf().setAppName("Application").setMaster("local[2]") 
         // since we have nearly totally different PartitionKeys, default: 1000 
         .set("spark.cassandra.output.batch.grouping.buffer.size", "1") 
         // write as much concurrently, default: 5 
         .set("spark.cassandra.output.concurrent.writes", "1024") 
         // batch same replica, default: partition 
         .set("spark.cassandra.output.batch.grouping.key", "replica_set") 
        val sc = new SparkContext(conf) 
        sc 
    } 
    

    와 장난 들어

는 속도 dramatica을 강화했다 내 로컬 런에서 lly.

그래서 가장 좋은 방법을 얻으려면 다양한 매개 변수를 시험해 볼 필요가 있습니다. 적어도 그것이 내가 가진 결론입니다.