2016-06-27 2 views
3

내가 아주 간단한 스파크 DataFrame을하고 DataFrame의 GROUPBY을 실행할 때 성능이 끔찍 -내 Spark DataFrame이 RDD보다 속도가 느린 이유는 무엇입니까?

내 캐시 DF 단지 두 개의 열입니다 ... RDD reduceByKey (머리로)보다 약 8 배 느리게 동등한, 단지 50,000 행이 고객 이름 : 나는 다음과 같은 두 가지 미리보기를 실행하면

== Physical Plan == 
InMemoryColumnarTableScan [customer#2454,name#2456], InMemoryRelation [customer#2454,name#2456], true, 10000, StorageLevel(true, true, false, true, 1), Scan ParquetRelation[customer#2454,name#2456] InputPaths: hdfs://nameservice1/tmp/v2_selected_parquet/test_parquet2, None 

, 나는 유사한 성능,하지 10S와 85S의 DF 버전에서 실행할 수있는 RDD 버전을 기대 ...

rawtempDF2.rdd.map(lambda x: (x['name'], 1)).reduceByKey(lambda x,y: x+y).collect() 

rawtempDF2.groupby('name').count().collect() 

내가 빠졌습니다. 여기에 정말 근본적인 뭔가? FWIW, RDD 버전은 54 단계, DF 버전은 227입니다./

편집 : Spark 1.6.1 및 Python 3.4.2를 사용하고 있습니다. Edit2 : 또한 소스 쪽매가 고객/day/name으로 분할되었습니다 - 현재 27 명의 고객, 1 일, c. 45 개의 이름.

+0

두 숫자가 너무 높게 보입니다. 이 코드를 실행하고 시간을 측정하려면 어떻게합니까? – zero323

+0

나는 Jupyter 노트북에서 실행 중이 었으며 SparkUI에서 작업을 실행하고있었습니다. 백 엔드는 Mesos (나보다 나은 사람들이 만들었습니다.)이고 Spark 인스턴스는 24 코어와 99GB RAM을 가지고 있습니다. 나는이 모든 것을 초보자이기 때문에 여전히 시간과 테스트에 최선의 방법을 배우고 있습니다. – RichD

답변

5

두 숫자가 비교적 높은 것처럼 보이며 DataFrame을 작성하는 방법이나 시간을 측정하는 방법이 명확하지 않지만 일반적으로 이와 같은 차이는 파티션 수와 비교하여 기록 수가 적음으로 설명 할 수 있습니다.

spark.sql.shuffle.partitions의 기본값은받는 작업 수가 200입니다. 50K 레코드를 사용하면 작업을 시작하는 오버 헤드가 병렬 실행에서 얻을 수있는 속도보다 높습니다. 간단한 예를 들어 설명해 보겠습니다.

import string 
import random 

random.seed(323) 

def random_string(): 
    n = random.randint(3, 6) 
    return (''.join(random.choice(string.ascii_uppercase) for _ in range(n)),) 

df = (sc 
    .parallelize([random_string() for _ in range(50000)], 8).toDF(["name"]) 
    .cache()) 

그리고 shuffle.partitions의 수에 따라 시간을 측정 : 먼저 예를 들어 데이터를 생성 할 수 있습니다이 값은 당신이 주장 무엇에 비교할 수 없습니다 만

sqlContext.setConf("spark.sql.shuffle.partitions", "1") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 504 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "1") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 451 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "100") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 624 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "200") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 778 ms per loop 

sqlContext.setConf("spark.sql.shuffle.partitions", "1000") 
%timeit -n 10 df.groupby('name').count().collect() 
## 10 loops, best of 3: 1.75 s per loop 

이 데이터는 지역에서 수집 된 모드에서는 비교적 명료 한 패턴을 볼 수 있습니다. 동일 RDD에 적용 적절한 분산 환경에서

from operator import add 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1).collect() 
## 10 loops, best of 3: 414 ms per loop 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 10).collect() 
## 10 loops, best of 3: 439 ms per loop 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 100).collect() 
## 10 loops, best of 3: 1.3 s per loop 

%timeit -n 10 df.rdd.map(lambda x: (x['name'], 1)).reduceByKey(add, 1000).collect() 
## 10 loops, best of 3: 8.41 s per loop 

이 때문에 네트워크 IO의 비용이 더 높을 것이다. 그냥 비교

은 또한 데이터의 지역을 살펴한다 스파크

from collections import Counter 

data = df.rdd.flatMap(lambda x: x).collect() 

%timeit -n 10 Counter(data) 
## 10 loops, best of 3: 9.9 ms per loop 

없이 로컬에서이 작업을 실행하는 데 얼마나 걸립니까 확인할 수 있습니다. 사용 및 구성에 따라 저장 공간에 따라 이와 같이 작은 입력이 있어도 작업에 지연이 추가 될 수 있습니다.

+0

일이 잘못 될 것으로 보이는 훌륭한 데모입니다. 100 셔플 실행은 루프 당 31 초 걸렸습니다 (6 노드 메소 클러스터에서 스파크가 실행 됨) - 분명히 잘못되었지만 인프라 직원에게 불평하기 전에 더 큰 데이터 세트/다른 파티션을 사용하여 더 많은 테스트를 수행합니다.) – RichD

+0

참조 : 파티션/평균 실행 시간 : 1/1.53s, 10/2.6s, 100/31.3s, 200/65s – RichD

+0

글쎄, 나는 이것이 당신이 비율을 설명하는 데 필요한 모든 것이라고 확신하지만 절대 값은 길인 것처럼 보인다. 떨어져서. – zero323