6

나는 압축 된 데이터 (문자열)를 약 5GB 정교하게 만들어야하는 PySpark 응용 프로그램을 가지고 있습니다. 나는 RAM에 12 코어 (24 스레드)와 72Gb의 소형 서버를 사용하고 있습니다. 내 PySpark 프로그램은 2 개의지도 작업으로 만 구성되어 있으며 3 개의 매우 큰 정규식 (각각 이미 3GB로 컴파일 됨)과 pickle으로로드됩니다. Spark는 동일한 컴퓨터에서 작업자와 마스터가있는 독립 실행 형 모드로 작업하고 있습니다.스파크는 얼마나 많은 복사본을 만드나요?

제 질문은 : 각 실행기 코어마다 각 변수를 spark 복제합니까? 사용 가능한 모든 메모리를 사용하고 많은 스왑 공간을 사용하기 때문입니다. 아니면 RAM에있는 모든 파티션을로드합니까? RDD는 3 개의 정규식으로 검색해야하는 약 10 백만 개의 문자열을 포함합니다. RDD는 약 1000 개의 파티션을 계산합니다. 몇 분 후에 메모리가 가득 차서 스팍 공간이 매우 느리게 시작되어서이 작업을 마치는 데 어려움이 있습니다. 정규식이 없으면 상황이 동일하다는 것을 알았습니다. ( 약 1 시간 후

import json 
import re 
import twitter_util as twu 
import pickle 

from pyspark import SparkContext 
sc = SparkContext() 

prefix = '/home/lucadiliello' 

source = prefix + '/data/tweets' 
dest = prefix + '/data/complete_tweets' 

#Regex's path 
companies_names_regex = prefix + '/data/comp_names_regex' 
companies_names_dict = prefix + '/data/comp_names_dict' 
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal' 

#Loading the regex's 
comp_regex = pickle.load(open(companies_names_regex)) 
comp_dict = pickle.load(open(companies_names_dict)) 
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal)) 

#Loading the RDD from textfile 
tx = sc.textFile(source).map(lambda a: json.loads(a)) 


def get_device(input_text): 
    output_text = re.sub('<[^>]*>', '', input_text) 
    return output_text 

def filter_data(a): 
    res = {} 
    try: 
     res['mentions'] = a['entities']['user_mentions'] 
     res['hashtags'] = a['entities']['hashtags'] 
     res['created_at'] = a['created_at'] 
     res['id'] = a['id'] 

     res['lang'] = a['lang'] 
     if 'place' in a and a['place'] is not None:  
      res['place'] = {} 
      res['place']['country_code'] = a['place']['country_code'] 
      res['place']['place_type'] = a['place']['place_type'] 
      res['place']['name'] = a['place']['name'] 
      res['place']['full_name'] = a['place']['full_name'] 

     res['source'] = get_device(a['source']) 
     res['text'] = a['text'] 
     res['timestamp_ms'] = a['timestamp_ms'] 

     res['user'] = {} 
     res['user']['created_at'] = a['user']['created_at'] 
     res['user']['description'] = a['user']['description'] 
     res['user']['followers_count'] = a['user']['followers_count'] 
     res['user']['friends_count'] = a['user']['friends_count'] 
     res['user']['screen_name'] = a['user']['screen_name'] 
     res['user']['lang'] = a['user']['lang'] 
     res['user']['name'] = a['user']['name'] 
     res['user']['location'] = a['user']['location'] 
     res['user']['statuses_count'] = a['user']['statuses_count'] 
     res['user']['verified'] = a['user']['verified'] 
     res['user']['url'] = a['user']['url'] 
    except KeyError: 
     return [] 

    return [res] 


results = tx.flatMap(filter_data) 


def setting_tweet(tweet): 

    text = tweet['text'] if tweet['text'] is not None else '' 
    descr = tweet['user']['description'] if tweet['user']['description'] is not None else '' 
    del tweet['text'] 
    del tweet['user']['description'] 

    tweet['text'] = {} 
    tweet['user']['description'] = {} 
    del tweet['mentions'] 

    #tweet 
    tweet['text']['original_text'] = text 
    tweet['text']['mentions'] = twu.find_retweet(text) 
    tweet['text']['links'] = [] 
    for j in twu.find_links(text): 
     tmp = {} 
     try: 
      tmp['host'] = twu.get_host(j) 
      tmp['link'] = j 
      tweet['text']['links'].append(tmp) 
     except ValueError: 
      pass 

    tweet['text']['companies'] = [] 
    for x in comp_regex.findall(text.lower()): 
     tmp = {} 
     tmp['id'] = comp_dict[x.lower()] 
     tmp['name'] = x 
     tmp['legalName'] = comp_dict_legal[x.lower()] 
     tweet['text']['companies'].append(tmp) 

    # descr 
    tweet['user']['description']['original_text'] = descr 
    tweet['user']['description']['mentions'] = twu.find_retweet(descr) 
    tweet['user']['description']['links'] = [] 
    for j in twu.find_links(descr): 
     tmp = {} 
     try: 
      tmp['host'] = twu.get_host(j) 
      tmp['link'] = j 
      tweet['user']['description']['links'].append(tmp) 
     except ValueError: 
      pass 

    tweet['user']['description']['companies'] = [] 
    for x in comp_regex.findall(descr.lower()): 
     tmp = {} 
     tmp['id'] = comp_dict[x.lower()] 
     tmp['name'] = x 
     tmp['legalName'] = comp_dict_legal[x.lower()] 
     tweet['user']['description']['companies'].append(tmp) 

    return tweet 


res = results.map(setting_tweet) 

res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec") 

UPDATE, 메모리 72기가바이트을 :

는 트위터의 트윗의 모든 쓸모없는 필드를 삭제하고 특정 단어를 트윗의 텍스트와 설명을 스캔, 내 코드입니다) 완전히 완전히 스와핑 (72gb)입니다. 제 경우에는 방송을 사용하는 것이 해결책이 아닙니다.

업데이트 2 pickle과 함께 3 개의 변수를로드하지 않고도 144GB 대신 최대 10GB의 RAM을 사용할 때 문제없이 종료됩니다! (72GB RAM + 72Gb 스왑)

+1

코드는 좋지만 문제없이 답할 수 있습니다 - Spark는 Python 작업자에게 할당 한 많은 스레드 (코어)만큼 로컬 변수 사본을 사용합니다. 거기에 대한 몇 가지 해결 방법이 있지만 일반적으로 꽤 정교합니다. – zero323

+0

코드가 주어지면 드라이버 사본에 +1을 추가해야하고, 드라이버의 절이있는 버전에는 +1을, 실행자 JVM에는 +1을 더하는 것이 좋습니다. 집행자로부터 직접 브로드 캐스트 또는 데이터를로드하여이 기능을 약간 향상시킬 수 있습니다. – zero323

+0

모든 실행 프로그램 프로세스에서 동일한 regex 인스턴스를 메모리에 사용하는 트릭이 없습니까? 거기에 내가 집행자의 수를 줄일 것이라고 생각하지 않으면 ..... –

답변

1

내 질문 : 각 실행 코어의 각 변수를 spark 복제합니까?

예!

각 (로컬) 변수의 복사본 수는 Python 작업자에게 할당 한 스레드 수와 같습니다.


는 문제에 관해서는, pickle를 사용하지 않고 comp_regex, comp_dictcomp_dict_legal을로드하십시오.