2016-10-26 9 views
2

연결된 구성 요소를 사용하려고하지만 크기 조정에 문제가 있습니다. 내 여기 내가 가진거야 -Spark - GraphX ​​- 연결된 구성 요소의 크기 조절

// get vertices 
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache 

// get edges 
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache 

// create graph 
val identityGraph = Graph(vertices, edges) 

// get connected components 
val cc = identityGraph.connectedComponents.vertices 

여기에서 GraphUtil에는 꼭지점과 가장자리를 반환하는 도우미 함수가 있습니다. 이 시점에서 내 그래프는 ~ 백만 노드와 ~ 2 백만 에지 (btw, 이것은 ~ 1 억 노드로 증가 할 것으로 예상됩니다)가 있습니다. 내 그래프는 거의 띄엄 띄엄 연결되어 있으므로 많은 작은 그래프를 기대합니다.

위를 실행할 때 나는 계속 java.lang.OutOfMemoryError: Java heap space을 얻습니다. 내가 executor-memory 32g 시도하고 원사 컨테이너 크기로 45g 15 노드의 클러스터를 실행하고 있습니다. 다음 로그 또한

16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext 
java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOfRange(Arrays.java:2694) 
    at java.lang.String.<init>(String.java:203) 
    at java.lang.StringBuilder.toString(StringBuilder.java:405) 
    at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360) 
    at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98) 
    at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216) 
    at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32) 
    at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44) 
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146) 
    at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146) 
    at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173) 
    at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34) 
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
    at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31) 
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55) 
    at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181) 
    at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) 

, 내가 얻고 많은 :

16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes 
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes 
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes 

내 질문은 사람이 규모 ConnectedComponents을 시도하고있다 여기

예외 세부입니까? 그렇다면 내가 뭘 잘못하고 있니?

답변

1

연결된 구성 요소 알고리즘은 확장 성이 좋지 않으며 그 성능은 그래프의 토폴로지에 상당히 의존합니다. 가장자리가 희박하더라도 작은 구성 요소가 있다는 것을 의미하지는 않습니다. 긴 줄의 가장자리는 매우 드물지만 (가장자리 수 = 꼭짓점 수 -1), GraphX에서 구현 된 무차별 대결은 매우 효율적이지 않습니다 (출처 : ccpregel 참조). 여기

는 (만 분류 코드)를 시도 할 것입니다 :

  1. 체크 포인트 당신의 꼭지점과 모서리가 (디스크) 마루에, 다음 그래프 구축 다시로드합니다. 실행 계획이 너무 커지면 캐싱이 잘되지 않는 경우가 있습니다.
  2. 알고리즘 결과를 변경하지 않는 방식으로 그래프를 변형하십시오. 예를 들어, code에서 algo가 정보를 양방향으로 전파하고 있음을 알 수 있습니다 (기본값). 따라서 두 개의 꼭지점을 연결하는 가장자리가 여러 개인 경우 해당 알 고를 적용한 그래프에서 필터링하십시오.
  3. 최적화 GraphX ​​코드를 직접 (즉, OOM을 방지하기 위해 각 반복에서 디스크 검사 점) 일반적인 최적화 저장 메모리를 사용하여, (정말 아주 간단하다), 또는

(2 포인트와 유사한) 도메인 별 최적화 GraphX ​​(다소 유산이되고있다)를 떠날 수 있다면 GraphFrame (package, blog )을 고려해 볼 수 있습니다. 나는 결코 시도하지 않았다. 그래서 나는 그것이 CC를 가지고 있는지 모른다.

나는 스파크 패키지 중에서 다른 가능성을 발견 할 수 있지만, 스파크 밖에서 사용할 수도 있습니다. 그러나 이것은 문제의 범위를 벗어납니다.

행운을 빈다.

+1

GraphFrames는 DataFrames와 GraphX를 사용하여 어떻게 이것이 OP에 도움이되는지 이해하지 못합니다. – eliasah

+0

@eliasah GraphF가 GraphX보다 더 최적화되기를 바랍니다. DataFrames에서 실행된다는 사실은 좋은 징표입니다. 그 이유는 촉매 최적화 도구와 텅스텐을 활용할 수 있기 때문입니다. 내가 말했듯이, 나는 시도하지 않았고, 나는 정보통의 희망을 가지고 있습니다. – Wilmerton

+0

GraphX는 기본 그래프 이론이 확장하기 어렵 기 때문에 누구도 작업하고 싶어하지 않는 프로젝트입니다. 불행히도 "거의"죽은 프로젝트입니다. 그래도 GraphFrames는 GraphX보다 훨씬 더 발전 할 것이라고 생각하지 않습니다. – eliasah