좋은 아침. 첫 번째 단계는 실제 간단한 사용 사례입니다. 고객 + 점수.스파크 카산드라 커넥터 : SQLContext.read + SQLContext.write 대 수동 구문 분석 및 삽입 (JSON -> 카산드라) 난 그냥 아파치 스파크와 아파치 카산드라를 조사하기 시작</p> <p>
카산드라 테이블에는 고객이 PrimaryKey로 있습니다. 카산드라는 로컬에서만 실행됩니다 (클러스터가 전혀 없습니다!).
SparkJob (독립 실행 형 로컬 [2])이 JSON 파일을 구문 분석 한 다음 전체 내용을 Cassandra에 쓰고 있습니다.
먼저 용액
val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val cass = CassandraConnector(conf)
val customerScores = sc.textFile(file).cache()
val customerScoreRDD = customerScores.mapPartitions(lines => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
lines
.map(line => {
mapper.readValue(line, classOf[CustomerScore])
})
//Filter corrupt ones: empty values
.filter(customerScore => customerScore.customer != null && customerScore.score != null)
})
customerScoreRDD.foreachPartition(rows => cass.withSessionDo(session => {
val statement: PreparedStatement = session.prepare("INSERT INTO playground.customer_score (customer,score) VALUES (:customer,:score)")
rows.foreach(row => {
session.executeAsync(statement.bind(row.customer.asInstanceOf[Object], row.score))
})
}))
sc.stop()
수동 모든 일 라인을 파싱하고 산드 수단에 삽입했다.
대략 10000000 레코드 (SparkContext 만들기 포함 ... 등)의 경우 약 714020 ms이 필요합니다.
는 다음 나는 스파크 카산드라 커넥터에 대해 읽고 다음 한 :val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
var sql = new SQLContext(sc)
val customerScores = sql.read.json(file)
val customerScoresCorrected = customerScores
//Filter corrupt ones: empty values
.filter("customer is not null and score is not null")
//Filter corrupt ones: invalid properties
.select("customer", "score")
customerScoresCorrected.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "playground", "table" -> "customer_score"))
.save()
sc.stop()
그래서 훨씬 간단 필요한 코드의 의미에서 주어진 API를 사용하여.
이 해결 방법은 약 10000000 개의 레코드에 대해 대략 1232871ms을 사용합니다 (다시 말하면 모두 동일한 측정 포인트).
는(수동 구문 분석 플러스 1,530,877 MS 소요 saveToCassandra
를 사용하여뿐만 아니라, 세 번째 솔루션을했다) 이제 내 질문 :
어떤 방법으로 수행 할 수있는 "올바른"방법입니다 이 유스 케이스는 요즘의 "최고의 실천"(그리고 실제로 시나리오에서는 캐스 산드라와 스파크를 모으는 것 중 가장 성능이 좋은 것)인가? 내 결과에서 SQLContext.read
+ SQLContext.write
대신 "수동"물건을 사용합니다.
미리 의견을 보내 주셔서 감사합니다.
RDD를 작성할 때 ('saveToCassandra' 사용) Cassandra 커넥터를 사용하면 좋은 결과를 얻었습니다. DataFrames 대신 RDD를 사용하면 Cassandra 토큰 범위 ('repartitionByCassandraReplica' 사용)에 따라 다시 분할 할 수 있으므로 대부분의 쓰기가 지역적이므로 많은 Cassandra 코디네이터 작업을 피할 수 있습니다. – LiMuBei
감사합니다. @LiMuBei, 이것은 실제로 saveToCassandra를 로컬 테스트 케이스에서도 조금 줄입니다. "수동"솔루션 (첫 번째 코드 스 니펫)이 여전히 가장 빠른 것 같습니다. – markush81
첫 번째 솔루션에서 비동기 작업이 실제로 완료 될 때까지 기다리지 않아도 될까요? 이 방법은 모든 삽입 작업이 성공적으로 완료되었다고 보장 할 수 없습니다. – LiMuBei