2017-01-09 2 views
1

오랜 시간이 지난 후에도 검색 할 수 없으므로 요청해야합니다! 아파치 스파크로 짹짹에서 해시 태그의 간단한 단어 카운트를하고 싶습니다. 응용 프로그램은 Kafka에서 Hashtags를 가져오고 reduceByKey 함수까지 모든 것이 제대로 작동합니다. 내가 뭘해야 할 것은 비슷한 Hastags이 계산되어 표시 얻을 것입니다Apache Spark : reduceByKey 함수가 Java 응용 프로그램을 중지합니다.

------------------------------------------- 
Time: 1483986210000 ms 
------------------------------------------- 
(Presse,1) 
(Trump,1) 
(TheResistanceGQ,1) 
(MerylStreep,1) 
(theresistance,1) 
(Theranos,1) 
(Russian,1) 
(Trump,1) 
(trump,1) 
(Üstakıl,1) 
... 

, 그러므로 나는이 필요합니다 결과가이처럼이 기능이없는

(나는 트위터와 스파크 betwenn 직접 연결이 있다는 것을 알고) reduceByKey 기능,하지만 난 다음 얻을 오류 :

package org.apache.spark.examples.streaming; 

import java.util.HashMap; 
import java.util.HashSet; 
import java.io.FileOutputStream; 
import java.io.PrintStream; 
import java.time.Duration; 
import java.util.Arrays; 
import java.util.Iterator; 
import java.util.Map; 
import java.util.Map.Entry; 
import java.util.Set; 
import java.util.regex.Pattern; 

import scala.Tuple2; 

import kafka.serializer.StringDecoder; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.apache.spark.streaming.Durations; 
import org.apache.log4j.Logger; 

/** 
* Consumes messages from one or more topics in Kafka and does wordcount. 
*/ 

public final class JavaDirectKafkaWordCount { 
    private static final Pattern SPACE = Pattern.compile(" "); 

    public static void main(String[] args) throws Exception { 

     String brokers = "XXX.XXX.XXX.XXX:9092"; 
     String topics = "topicMontag"; 

     // Create context with a 2 seconds batch interval 
     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 

     Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); 
     Map<String, String> kafkaParams = new HashMap<>(); 
     kafkaParams.put("metadata.broker.list", brokers); 
     kafkaParams.put("group.id", "1"); 
     kafkaParams.put("auto.offset.reset", "smallest"); 

     // Create direct kafka stream with brokers and topics 
     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class, 
       StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet); 

     messages.foreachRDD(rdd -> { 
      System.out.println(
        "--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records"); 
      // rdd.foreach(record -> System.out.println(record._2)); 
     }); 

     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 

     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
      @Override 
      public Iterator<String> call(String x) { 
       return Arrays.asList(SPACE.split(x)).iterator(); 
      } 
     }); 

     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() { 
      @Override 
      public Tuple2<String, Integer> call(String s) { 
       return new Tuple2<>(s, 1); 
      } 
     }); 

     JavaPairDStream<String, Integer> result = wordCounts.reduceByKey(new Function2<Integer, Integer, Integer>() { 
      @Override 
      public Integer call(Integer i1, Integer i2) { 
       return new Integer(i1 + i2); 
      } 
     }); 

     //wordCounts.print(); 
     result.print(); 
     // PrintStream out = new PrintStream(new 
     // FileOutputStream("output.txt")); 
     // System.setOut(out); 

     // Start the computation 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 
다음은

17/01/09 19:28:54 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at JavaDirectKafkaWordCount.java:106) finished in 0,377 s 
17/01/09 19:28:54 INFO DAGScheduler: looking for newly runnable stages 
17/01/09 19:28:54 INFO DAGScheduler: running: Set() 
17/01/09 19:28:54 INFO DAGScheduler: waiting: Set(ResultStage 1) 
17/01/09 19:28:54 INFO DAGScheduler: failed: Set() 
17/01/09 19:28:54 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at JavaDirectKafkaWordCount.java:113), which has no missing parents 
17/01/09 19:28:54 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 899.7 MB) 
17/01/09 19:28:54 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1948.0 B, free 899.7 MB) 
17/01/09 19:28:54 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on XXX.XXX.XXX.XXX:56435 (size: 1948.0 B, free: 899.7 MB) 
17/01/09 19:28:54 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012 
17/01/09 19:28:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at JavaDirectKafkaWordCount.java:113) 
17/01/09 19:28:54 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 
17/01/09 19:28:54 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0, ANY, 5800 bytes) 
17/01/09 19:28:54 INFO Executor: Running task 0.0 in stage 1.0 (TID 2) 
17/01/09 19:28:54 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 
17/01/09 19:28:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms 
17/01/09 19:28:54 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2) 
java.lang.NoClassDefFoundError: net/jpountz/util/SafeUtils 
    at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:124) 
    at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2338) 
    at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2351) 
    at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2822) 
    at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) 
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301) 
... 

내 코드입니다

<dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>3.8.1</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.2.2</version> 
    </dependency> 
    <dependency> 
     <groupId>org.twitter4j</groupId> 
     <artifactId>twitter4j-stream</artifactId> 
     <version>4.0.4</version> 
    </dependency> 
    <dependency> 
     <groupId>com.twitter</groupId> 
     <artifactId>hbc-core</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>2.0.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-xml</artifactId> 
     <version>2.11.0-M4</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-examples_2.10</artifactId> 
     <version>1.0.0</version> 
    </dependency> 
</dependencies> 

어쩌면 누군가가 아이디어가있다 : 여기에 0

내 pom.xml 파일입니까? 감사합니다 ...

+1

'NoClassDefFo undError'는 매우 구체적인 의미와 원인을 가지고 있습니다. 나머지 로그 출력은 무엇입니까? – nitind

답변

0

문제는 잘못된 pom.xml 정의입니다. 스칼라 - XML에서 처음에는

, <version>2.11.0-M4</version>는 - 2.10 버전으로 교체하거나 다른 오류

귀하의 문제는 당신이 의존성의로 net.jpountz.lz4:lz4:jar:1.2.0을 가지고 의존성 org.apache.kafka:kafka_2.10:jar:0.8.2.2를 사용하고 있다는 점이다있을 것이다. 스파크 2는 net.jpountz.lz4:lz4:jar:1.3.0을 사용합니다.

  1. 스칼라-XML 버전 2.10에있을 OR
  2. 스칼라 2.11에 스파크를 업그레이드해야합니다 : Unfortunatelly Maven은 프로젝트 작업을하는 불꽃이

    단계에서 사용할 수 없습니다 낮은 버전이 라이브러리를 해결합니다 카프카 의존성 제거

  3. 모든 스파크 아티팩트 (스파크 스트리밍, 스파크 코어, 스파크 스트리밍 카프카, 스파크 예)에 대해 일관된 스파크 버전을 제공하십시오.

    1. mvn dependency:tree
    2. mvn dependency:resolve

    예제의 pom.xml :

    <dependencies> 
        <dependency> 
         <groupId>junit</groupId> 
         <artifactId>junit</artifactId> 
         <version>3.8.1</version> 
         <scope>test</scope> 
        </dependency> 
        <dependency> 
         <groupId>org.twitter4j</groupId> 
         <artifactId>twitter4j-stream</artifactId> 
         <version>4.0.4</version> 
        </dependency> 
        <dependency> 
         <groupId>com.twitter</groupId> 
         <artifactId>hbc-core</artifactId> 
         <version>2.2.0</version> 
        </dependency> 
        <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-streaming_2.10</artifactId> 
         <version>2.0.1</version> 
        </dependency> 
        <dependency> 
         <groupId>org.apache.spark</groupId> 
         <artifactId>spark-streaming-kafka-0-8_2.10</artifactId> 
         <version>2.1.0</version> 
        </dependency> 
    </dependencies> 
    
    그들 모두는 동일한 버전 의존성 문제를 조사하기

도움 메이븐 목표를 가져야한다

+0

많은 다른 버전의 종속성을 시도했지만이 조합은 정상적으로 작동합니다! 고마워요! – Eddy