0

카프카 스파크 스트리밍 작업. Producer에서 전송 된 데이터를 읽고 처리 할 수 ​​있습니다. 여기에 시나리오가 있는데, Producer가 메시지를 생성하고 Consumer가 잠시 동안 꺼져 있고 스위치가 켜져 있다고 가정 할 수 있습니다. 이제 Conumser는 라이브 데이터를 읽는 중입니다. 대신, 그것은 읽은 곳에서부터 데이터를 유지해야합니다. 다음은 사용하고있는 pom.xml입니다.카프카 스파크 스트리밍 오프셋 문제

<properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <spark.version>2.0.1</spark.version> 
     <kafka.version>0.8.2.2</kafka.version> 
    </properties> 


    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.11</artifactId> 
      <version>1.6.2</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json/json --> 
     <dependency> 
      <groupId>org.json</groupId> 
      <artifactId>json</artifactId> 
      <version>20160810</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 --> 
     <dependency> 
      <groupId>org.json4s</groupId> 
      <artifactId>json4s-ast_2.11</artifactId> 
      <version>3.2.11</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.2.0</version> 
     </dependency> 

나는 Kafka-v0.10.1.0 Producer and Conumser와 함께 작업 해 보았습니다. 동작은 예상대로입니다 (소비자는 왼쪽에서 데이터를 읽습니다). 따라서이 버전에서는 오프셋이 올바르게 선택됩니다.

위의 pom.xml에서도 동일한 버전을 사용해 보았지만 java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker은 실패했습니다.

버전의 호환성을 알고 있지만 연속 스트림을 찾고 있습니다.

+0

제 답변을 살펴 보셨습니까? 문제가 해결 되었습니까? – oh54

답변

0

카프카가 버전 0.8과 0.10 사이에서 다소 큰 변화를 겪었 기 때문에 다른 행동이 일어날 수 있습니다.

이전 버전을 꼭 사용해야하는 경우가 아니라면 최신 버전으로 전환하는 것이 좋습니다.

카프카 프로젝트

https://spark.apache.org/docs/latest/streaming-kafka-integration.html는 버전 0.8과 0.10 사이에 새로운 소비자 API를 도입, 그래서 2 개 개의 분리 대응하는 스파크 스트리밍 패키지를 사용할 수있다 :

이 링크에서보세요. 당신은 카프카의 v0.10.1.0을 사용하려면

, 당신은 이렇게 https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11에서 통합 의존성을 스트리밍 일부 카프카의 불꽃을 지정해야합니다. 예를 들어이 같은

뭔가 :

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

추가 참고 : 당신은 10 월 2013 년 출시와 하둡 측면에서, 따라서 고대가되었습니다 하둡 2.2.0을 사용하고, 당신은 그것을 교환을 고려해야 최신 버전.

도움이 될지 알려주세요.