이전 질문에 기초하여 Spark and Python use custom file format/generator as input for RDD 기본적으로 sc.textFile()로 입력을 구문 분석 한 다음 my 또는 일부 라이브러리 사용자 정의 함수를 사용하여 파싱 할 수 있어야한다고 생각합니다.스파크와 파이썬이 gensim을 사용하여 위키 피 디아를 구문 분석하려고합니다.
저는 특히 gensim 프레임 워크를 사용하여 위키피디아 덤프를 구문 분석하려고합니다. 마스터 노드와 모든 작업자 노드에 이미 gensim을 설치했으며 이제는이 질문에 영감을받은 위키 백과 페이지 구문 분석을 위해 gensim 빌드를 사용하고 싶습니다. List (or iterator) of tuples returned by MAP (PySpark).
내 코드는 다음입니다 :
import sys
import gensim
from pyspark import SparkContext
if __name__ == "__main__":
if len(sys.argv) != 2:
print >> sys.stderr, "Usage: wordcount <file>"
exit(-1)
sc = SparkContext(appName="Process wiki - distributed RDD")
distData = sc.textFile(sys.argv[1])
#take 10 only to see how the output would look like
processed_data = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
print processed_data
sc.stop()
extract_pages의 소스 코드를 https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py에서 발견하고 그것을 스파크와 함께 작동합니다 보인다 통해 내가는 기반으로 할 수 있습니다.
그러나 불행하게도 나는 다음과 같은 오류 로그지고있어 코드를 실행하면 다음 몇 가지 아마 스파크 로그
14/10/05 13:21:11 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <ip address>.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/root/spark/python/pyspark/worker.py", line 79, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/root/spark/python/pyspark/serializers.py", line 196, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/root/spark/python/pyspark/serializers.py", line 127, in dump_stream
for obj in iterator:
File "/root/spark/python/pyspark/serializers.py", line 185, in _batched
for item in iterator:
File "/root/spark/python/pyspark/rdd.py", line 1148, in takeUpToNumLeft
yield next(iterator)
File "/usr/lib64/python2.6/site-packages/gensim/corpora/wikicorpus.py", line 190, in extract_pages
elems = (elem for _, elem in iterparse(f, events=("end",)))
File "<string>", line 52, in __init__
IOError: [Errno 2] No such file or directory: u'<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.9/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.9/ http://www.mediawiki.org/xml/export-0.9.xsd" version="0.9" xml:lang="en">'
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
을 그리고 : 나는 '
14/10/05 13:21:12 ERROR scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/10/05 13:21:12 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
14/10/05 13:21:12 INFO scheduler.DAGScheduler: Failed to run runJob at PythonRDD.scala:296
및
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
을 내가 Spark없이 성공적으로 시도 했으므로 Spark와 gensim을 조합하여 문제를 해결해야하지만 나는 그 오류를 이해하고있다. gensim wikicorpus.py의 190 번째 줄에 파일이 없습니다.
편집 : 스파크에서
추가 좀 더 로그 :
EDIT2 :
gensim이 xml.etree.cElementTree import iterparse
에서 사용 문제를 야기하는 문서 here. 실제로 파일 이름이나 xml 데이터를 포함하는 파일을 기대합니다. RDD가 XML 데이터를 포함하는 파일로 간주 될 수 있습니까?
"[errno를 2] 해당 파일 또는 디렉토리가 IO 오류 :" 그것은
이 ReadableWiki를 살펴 보자 변환하는 많은 자원을 고려하지 않습니다 오는거야? 일반 Python (PySpark 없음)에서이 작업의 간단한 버전을 사용할 수 있습니까? –
PySpark없이 일반 파이썬에서보다 간단한 버전을 실행할 수있었습니다. 불행히도 문제가 될 수있는 유일한 추가 아이디어는 Edit2입니다. – ziky90
'flatMap'은'extact_pages'가리스트를 반환 할 것으로 기대합니다. 발전기로 작동하는지 모르겠습니다. 람다 함수에서'extract_data'의 결과를'list()'로 래핑하려고 시도 했습니까? –