2

Flume 에이전트에서 수집 한 Twitter 데이터를 Spark Stream으로 전달하는 중 문제가 발생했습니다. Flume 만 사용하면서 twits를 독립적으로 다운로드 할 수 있습니다. 하지만 다음과 같은 오류가 발생합니다. FlumeUtils.createStream()에서 기본 UTF-8 인코딩에 대한 문제라고 생각합니다. 어떻게 변경할 수 있습니까? 그리고 무엇을 바꾸어야합니까? pyspark 단자Flume 트위터 스트림을 파이썬에서 스파크로 연결하는 동안 UTF-8 인코딩 오류가 발생했습니다.

에러 :

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 107, in func 
    File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/flume.py", line 36, in utf8_decoder 
    return s.decode('utf-8') 
    File "/usr/lib/python2.7/encodings/utf_8.py", line 16, in decode 
    return codecs.utf_8_decode(input, errors, True) 
UnicodeDecodeError: 'utf8' codec can't decode byte 0xe4 in position 17: invalid continuation byte 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
17/01/01 15:36:41 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 

PySpark 코드 :

from pyspark.sql import SparkSession 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.flume import FlumeUtils 

ss = SparkSession.builder \ 
    .master("local[2]") \ 
    .appName("Stream_Analysis")\ 
    .config("spark.sql.crossJoin.enabled", "true") \ 
    .getOrCreate() 

sc = ss.sparkContext 

strm = StreamingContext(sc, 5) 

flume = FlumeUtils.createStream(strm,"localhost", 9999) 
flume.pprint() 
strm.start() 
strm.awaitTermination() 

Cmd를 시작하는 pyspark

spark-submit --jars ~/project/spark-streaming-flume-assembly_2.11-2.0.2.jar ~/project/news_stream_flume/news_stream_analysis.py localhost 9999 

수로 논문집 :

# Name the components on this agent 
FlumeAgent.sources = Twitter 
FlumeAgent.sinks = spark 
FlumeAgent.channels = MemChannel 

# Twitter source 
FlumeAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource 
FlumeAgent.sources.Twitter.consumerKey = x 
FlumeAgent.sources.Twitter.consumerSecret = y 
FlumeAgent.sources.Twitter.accessToken = z 
FlumeAgent.sources.Twitter.accessTokenSecret = xx 
FlumeAgent.sources.Twitter.keywords = flume, spark 

FlumeAgent.sinks.spark.type = avro 
FlumeAgent.sinks.spark.channel = memoryChannel 
FlumeAgent.sinks.spark.hostname = localhost 
FlumeAgent.sinks.spark.port = 9999 
FlumeAgent.sinks.spark.batch-size = 1 

# Use a channel which buffers events in memory 
FlumeAgent.channels.MemChannel.type = memory 
FlumeAgent.channels.MemChannel.capacity = 10000 
FlumeAgent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
FlumeAgent.sources.Twitter.channels = MemChannel 
FlumeAgent.sinks.spark.channel = MemChannel 
01 23,516,

cmd를 수조 에이전트를 실행 :

flume-ng agent --name FlumeAgent --conf-file /home/hduser/project/flume_config_2src_spark_avro -f /usr/lib/flume-ng/conf/flume-conf.properties -Dflume.root.logger=DEBUG,console 

답변

1

FlumeUtils.createStream 문자열 복호화하는데 사용되는 함수이다 bodyDecoder 인수 걸린다. 파이썬 2.x는 당신이 원하는 인코딩을 사용하는 자신의로 대체, 또는 정체성을 완전히 디코딩을 생략 할 수 있어야한다

def utf8_decoder(s): 
    """ Decode the unicode as UTF-8 """ 
    if s is None: 
     return None 
    return s.decode('utf-8') 
  • : 기본 구현은 Nonedecodes에 대한 UTF-8로 확인 (lambda x: x). Pyrolite에>unicode 매핑 -

  • 파이썬 3.x를 주위 String를 얻기 위해 몇 가지 추가 단계 (_.getBytes와 JVM 측 매핑)를 필요로 할 수있다.