2014-10-05 9 views
1

이전 질문에 기초하여 Spark and Python use custom file format/generator as input for RDD 기본적으로 sc.textFile()로 입력을 구문 분석 한 다음 my 또는 일부 라이브러리 사용자 정의 함수를 사용하여 파싱 할 수 있어야한다고 생각합니다.스파크와 파이썬이 gensim을 사용하여 위키 피 디아를 구문 분석하려고합니다.

저는 특히 gensim 프레임 워크를 사용하여 위키피디아 덤프를 구문 분석하려고합니다. 마스터 노드와 모든 작업자 노드에 이미 gensim을 설치했으며 이제는이 질문에 영감을받은 위키 백과 페이지 구문 분석을 위해 gensim 빌드를 사용하고 싶습니다. List (or iterator) of tuples returned by MAP (PySpark).

내 코드는 다음입니다 :

import sys 
import gensim 
from pyspark import SparkContext 


if __name__ == "__main__": 
    if len(sys.argv) != 2: 
     print >> sys.stderr, "Usage: wordcount <file>" 
     exit(-1) 

    sc = SparkContext(appName="Process wiki - distributed RDD") 

    distData = sc.textFile(sys.argv[1]) 
    #take 10 only to see how the output would look like 
    processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) 

    print processed_data 
    sc.stop() 

extract_pages의 소스 코드를 https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py에서 발견하고 그것을 스파크와 함께 작동합니다 보인다 통해 내가는 기반으로 할 수 있습니다.

그러나 불행하게도 나는 다음과 같은 오류 로그지고있어 코드를 실행하면 다음 몇 가지 아마 스파크 로그

14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <ip address>.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
File "/root/spark/python/pyspark/worker.py", line 79, in main 
serializer.dump_stream(func(split_index, iterator), outfile) 
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream 
self.serializer.dump_stream(self._batched(iterator), stream) 
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream 
for obj in iterator: 
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched 
for item in iterator: 
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft 
yield next(iterator) 
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages 
elems = (elem for _, elem in iterparse(f, events=("end",))) 
File "<string>", line 52, in __init__ 
IOError: [Errno 2] No such file or directory: u'<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en">' 
    org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) 
    org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) 
    org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) 
    org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    org.apache.spark.scheduler.Task.run(Task.scala:54) 
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    java.lang.Thread.run(Thread.java:745) 

을 그리고 : 나는 '

14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296 

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

을 내가 Spark없이 성공적으로 시도 했으므로 Spark와 gensim을 조합하여 문제를 해결해야하지만 나는 그 오류를 이해하고있다. gensim wikicorpus.py의 190 번째 줄에 파일이 없습니다.

편집 : 스파크에서

추가 좀 더 로그 :

EDIT2 :

gensim이 xml.etree.cElementTree import iterparse에서 사용 문제를 야기하는 문서 here. 실제로 파일 이름이나 xml 데이터를 포함하는 파일을 기대합니다. RDD가 XML 데이터를 포함하는 파일로 간주 될 수 있습니까?

+0

"[errno를 2] 해당 파일 또는 디렉토리가 IO 오류 :" 그것은

이 ReadableWiki를 살펴 보자 변환하는 많은 자원을 고려하지 않습니다 오는거야? 일반 Python (PySpark 없음)에서이 작업의 간단한 버전을 사용할 수 있습니까? –

+0

PySpark없이 일반 파이썬에서보다 간단한 버전을 실행할 수있었습니다. 불행히도 문제가 될 수있는 유일한 추가 아이디어는 Edit2입니다. – ziky90

+0

'flatMap'은'extact_pages'가리스트를 반환 할 것으로 기대합니다. 발전기로 작동하는지 모르겠습니다. 람다 함수에서'extract_data'의 결과를'list()'로 래핑하려고 시도 했습니까? –

답변

1

나는 보통 Scala의 Spark와 함께 작업합니다. 그럼에도 내 생각은 다음과 같습니다.

sc.textFile을 통해 파일을로드하면 sparkWorkers에 분산되어있는 일종의 라인 반복기입니다. 위키 백과의 xml 형식이 주어진 한 줄이 반드시 분석 가능한 xml 항목과 일치하지 않는다고 생각합니다. 따라서이 문제가 발생합니다.

즉 : 당신은 자신에 각 라인을 구문 분석하려고하면

Line 1 : <item> 
Line 2 : <title> blabla </title> <subitem> 
Line 3 : </subItem> 
Line 4 : </item> 

, 그것은 당신이 가지고있는 것과 같은 예외를 뱉어 것입니다.

나는 대개 위키피디아 덤프를 뒤죽박죽해야한다. 그래서 내가하는 일은 스파크가 쉽게 소화 할 수있는 "재판 버전"으로 바꾸는 것이다. 즉 기사 항목 당 한 줄. 일단 스파크를 먹으면 쉽게 처리 할 수 ​​있고 모든 종류의 처리를 할 수 있습니다.당신은 추적 할 수 있었 없습니다이 오류 - : https://github.com/idio/wiki2vec