2016-07-30 2 views
2

필드에 의한 데이터를 미리 정의 된 파티션 수로 파티션하는 가장 좋은 방법은 무엇입니까?열을 기준으로 파티션을 나누지 만 고정 된 파티션 수를 유지하는 효율적인 방법은 무엇입니까?

현재 partionCount = 600을 지정하여 데이터를 파티션하고 있습니다. 600이라는 숫자는 내 데이터 집합/클러스터 설정에 대해 최상의 쿼리 성능을 제공합니다.

val rawJson = sqlContext.read.json(filename).coalesce(600) 
rawJson.write.parquet(filenameParquet) 

는 지금은 열 'EVENTNAME'에 의해이 데이터를 분할하지만, 현재 약 2,000 독특한 eventNames 플러스 각 EVENTNAME의 행 번호가 균일하지 여전히 개수 (600)의 데이터를 유지하려는. 약 10 개의 eventNames에는 데이터의 50 % 이상이 데이터 비뚤어 짐을 일으 킵니다. 그러므로 아래처럼 분할하면 그다지 성능이 떨어진다. 글을 쓰는 데는 5 배나 많은 시간이 걸립니다.

val rawJson = sqlContext.read.json(filename) 
rawJson.write.partitionBy("eventName").parquet(filenameParquet) 

이러한 시나리오의 데이터를 분할하는 좋은 방법은 무엇입니까? eventName으로 파티션을 나누어도 600 개의 파티션으로 나누는 방법이 있습니까?

내 스키마는 다음과 같습니다

{ 
    "eventName": "name1", 
    "time": "2016-06-20T11:57:19.4941368-04:00", 
    "data": { 
    "type": "EventData", 
    "dataDetails": { 
     "name": "detailed1", 
     "id": "1234", 
... 
... 
    } 
    } 
} 

감사합니다!

답변

0

목록 버킷 링 개념을 적용 해 보았습니까? 언급 한 10 개의 이벤트 이름과 같이 열을 왜곡 할 파티션이 거의 없습니다. 나머지의 경우 다른 모든 키를 보유 할 파티션/디렉토리가 하나만있을 수 있습니다. here을 볼 수 있습니다. 대부분 80-20 규칙을 대상으로합니다.

0

이것은 왜곡 된 데이터의 일반적인 문제이며 취할 수있는 몇 가지 방법이 있습니다.

목록 버켓 팅은 시간 경과에 따라 왜곡이 안정적으로 유지되면 효과적 일 수 있습니다. 특히 파티션 변수의 새 값이 도입되는 경우 특히 그렇습니다. 필자는 시간이 지남에 따라 목록 버킷을 조정하는 것이 얼마나 쉬운 지 조사하지 않았으며 주석이 말한 것처럼 Spark 2.0 기능이기 때문에 어쨌든 사용할 수 없습니다.

1.6.x를 사용하는 경우 각 이벤트 이름을 600 개의 고유 한 값 중 하나로 매핑하는 고유 한 기능을 만들 수 있습니다. 이것을 UDF 또는 case 표현식으로 할 수 있습니다. 그런 다음 해당 함수를 사용하여 열을 만든 다음 coalesce(600)이 아닌 repartition(600, 'myPartitionCol)을 사용하여 해당 열을 기준으로 분할하면됩니다.

매우 왜곡 된 데이터를 Swoop에 처리 했으므로 필자는 다음과 같은 주력 데이터 구조가 파티션 관련 도구를 만드는 데 매우 유용하다는 것을 알았습니다. 데이터가 왜곡이며, 키의 수가 적은 사람이 그래서 우리는 비대칭 키에 잠시 동안 파티션의 수를 늘릴 수있다 : 우리가 약간 다른 경우의 파티션 프로그램을 구축하는 방법을

/** Given a key, returns a random number in the range [x, y) where 
    * x and y are the numbers in the tuple associated with a key. 
    */ 
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable { 
    private val r = new java.util.Random() // Scala Random is not serializable in 2.10 

    def apply(key: A): Int = { 
    val (start, end) = m(key) 
    start + r.nextInt(end - start) 
    } 

    override def toString = s"RandomRangeMap($r, $m)" 
} 

예를 들어, 여기에 키 당 파티션의 최소 개수로 1 고수 : 귀하의 경우

/** Partitions data such that each unique key ends in P(key) partitions. 
    * Must be instantiated with a sequence of unique keys and their Ps. 
    * Partition sizes can be highly-skewed by the data, which is where the 
    * multiples come in. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner { 
    private val rrm = new RandomRangeMap(
    keyMap.keys 
     .zip(
     keyMap.values 
      .scanLeft(0)(_+_) 
      .zip(keyMap.values) 
      .map { 
      case (start, count) => (start, start + count) 
      } 
    ) 
     .toMap 
) 

    override val numPartitions = 
    keyMap.values.sum 

    override def getPartition(key: Any): Int = 
    rrm(key) 
} 

object ByKeyPartitionerWithMultiples { 

    /** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure. 
    * 
    * @param keyMap maps key values to their partition multiples 
    */ 
    def udf(keyMap: Map[String, Int]) = { 
    val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]]) 
    (key:String) => partitioner.getPartition(key) 
    } 

} 

, 당신은 변화를 필요로 하나의 파티션에 여러 개의 이벤트 이름을 병합해야하지만 위의 코드는 당신에게 어떻게 아이디어를 제공 희망 문제에 접근한다.

마지막 관찰은 이벤트 이름의 분포가 시간이 지남에 따라 데이터에 많은 가치가있는 경우 데이터의 일부분에 대한 통계 수집 통과를 수행하여 매핑 테이블을 계산할 수 있다는 것입니다. 필요할 때마다 항상이 작업을 수행 할 필요는 없습니다.이를 확인하기 위해 각 파티션에서 출력 파일의 행 및/또는 크기의 수를 볼 수 있습니다. 즉, Spark 작업의 일부로 전체 프로세스를 자동화 할 수 있습니다.

+0

감사합니다. – vijay

+1

계산 항목 (eventName의 맵)으로 파티션을 다시 분할하는 경우 eventName으로 필터링하는 쿼리 (즉, WHERE eventName == "foo")는 관련 파티션 만 읽을 수 있고 전체 테이블 검색은 수행 할 수 없지만, 이제 더 이상 eventName이 분할되지 않았기 때문에? – vijay

+0

가장 효율적인로드는 파티션 열을 정확하게 필터링 할 때만 발생합니다. 시간이 지남에 따라 왜곡이 안정적이면 정적 매핑을 사용하고 (쿼리 버킷 일 필요는 없으며 쿼리를 수행하는 동안 동일한 함수를 적용합니다) 스큐가 시간이 지남에 따라 안정적이지 않으면 시간 경과에 따라 이벤트 - 파티션 맵의 데이터 구조를 개별적으로 유지 보수해야하며, 쿼리하는 기간 동안 파티션을 병합하여 필터링하면 파티션 수를 효율적으로 줄일 수 있습니다 파티션) 및 이벤트 이름 (파티션 내에서 초점을 맞추기 위해). – Sim