2016-06-15 4 views
1

My Spark 클러스터에는 마스터가 1 명, 작업자가 2 명 있습니다. 응용 프로그램은 s3에서 DataFrames로 csv 파일을 읽어 임시 테이블로 등록하고 sqlContext를 사용하여 sql 쿼리를 실행하여 새 DataFrames를 만듭니다. 그런 다음이 DF는 MySql DB에 저장됩니다. 이러한 작업은 모두 여러 노드에서 실행됩니다.하나의 실행 프로그램 만 사용하여 실행중인 스파크 작업이있는 이유는 무엇입니까?

그러나이 테이블을 DB에서 DataFrames로 읽어 들인 후 임시 테이블로 등록하고 sqlContext 쿼리를 실행하면 모든 노드가 하나의 노드로 처리됩니다. 무엇이 이것을 일으킬 수 있습니까?

DataFrame a = sqlContext.read().format("com.databricks.spark.csv").options(options) 
       .load("s3://s3bucket/a/part*"); 
DataFrame b = sqlContext.read().format("com.databricks.spark.csv").options(options) 
       .load("s3://s3bucket/b/part*"); 

a.registerTempTable("a"); 
b.registerTempTable("b"); 

DataFrame c = sqlContext.sql("SELECT a.name, b.name from a join b on a.id = b.a_id"); 

c.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "c", prop); 

// other jobs are similar 

Map<String, String> dOptions = new HashMap<String, String>(); 
dOptions.put("driver", MYSQL_DRIVER); 
dOptions.put("url", MYSQL_CONNECTION_URL); 

dOptions.put("dbtable", "(select * from c) AS c"); 
rC= sqlContext.read().format("jdbc").options(dOptions).load(); 
rC.cache(); 

dOptions.put("dbtable", "(select * from d) AS d"); 
rD= sqlContext.read().format("jdbc").options(dOptions).load(); 
rD.cache(); 

dOptions.put("dbtable", "(select * from f) AS f"); 
rF= sqlContext.read().format("jdbc").options(dOptions).load(); 
rF.cache(); 

rC.registerTempTable("rC"); 
rD.registerTempTable("rD"); 
rF.registerTempTable("rF"); 

DataFrame result = sqlContext.sql("SELECT rC.name, rD.name, rF.date from rC join rD on rC.name = rD.name join rF on rC.date = rF.date"); 

result.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "result_table", prop); 
+1

직장을 제출하는 데 무엇을 사용하고 있습니까? "하나의 노드"를 의미 할 때 마스터 UI에서 하나의 작업자 만 볼 수 있다는 의미입니까? –

+0

@Hawknight 작업 제출을 위해 spark-submit을 사용하고 있습니다. 다음은 전체 명령입니다 : "spark-submit --class MyClass --deploy-mode cluster s3 : //bucket/file.jar". 저는 Ganglia와 Spark UI를 통한 모니터 니아입니다. 두 사람 모두 근로자를 인정하고 일부 일자리가 평행선에서 처형되는 것을 봅니다. 그러나 위에 게시 한 작업 중 paralelization이 중지되고 특정 작업 (스테이지 내)은 하나의 작업자 노드에서만 수행됩니다. 다음은 그것을 보여주는 Ganglia UI의 스크린 샷입니다. http://pokit.org/get/img/2a5bcd853b97aad2bc9e86a90c9b2733.png – KikiRiki

+0

코드에만 기반하여 왜 특정 작업 단계에서 한 명의 작업자 만 작업하는지 알기는 어렵습니다. 어떤 단계에서 문제를 정확히 지적하고 문제가 발생하는지 파악할 수 있습니까? –

답변

0

당신이 우리와 함께 SparkConf() 객체를 공유 할 수 없습니다 : 여기

내 코드의 예입니다?

SparkConf() 개체에는 Spark 응용 프로그램의 구성이 포함되어 있습니다. 그것은 다음과 같은 키값 쌍 각종 점화 파라미터를 설정하기 위해 사용된다 : 실행기 코어

실행기 넘버의

넘버

-THE 마스터

-heap 메모리

할당

- 기타 ..

+1

여기 내 SparkConf 개체 : SparkConf conf = new sparkConf(). setAppName ("org.spark.SchemaTransformer"). setMaster ("yarn-cluster"); 실행 환경 : spark.master = yarn executor.cores = 4 executor.memory = 5120M – KikiRiki

+1

일부 인수가 누락 된 것으로 보입니다. 다음 방법을 추가하십시오. 예 : SparkConf(). ("master", "yarn") \ .set ("spark.submit.deployMode", "cluster") .set ("spark.executor.instances", "8" ("spark.executor.memory", "5120M") \ .set ("spark.river.memory", " 5120M ") \ .set ("spark.yarn.memoryOverhead ","10000M ") \ .set ("spark.yarn.driver.memoryOverhead ","10000M ") – theudbald