这是我的代码:
# map function def echo(lines): if lines: for i, line in enumerate(lines): print i, "=>", line # load the data idsFile = "ids.txt" # Should be some file on your system linesData = sc.textFile(idsFile).cache() # clean it cleanLinesData = linesData.map(lambda line: line.strip()) filteredLinesData = cleanLinesData.filter(lambda line: True if line else False) # setup task answers = filteredLinesData.mapPartitions(echo, 2) # split file into 2 pieces
和文件ids.txt
:
1,2,3,4,5,1 6,4,8,3,2 9,9,9,9,9,9 100000000,1 10,10,10 1,2,4,2
要运行它,我运行:
$ IPYTHON=1 pyspark --master local[2]
然后我%cpaste
的代码(有更好的方法吗?).
如果我只是尝试take()
并看到值,我得到一个合理的输出:
In[2]: filteredLinesData.take(6) Out[2]: [u'1,2,3,4,5,1', u'6,4,8,3,2', u'9,9,9,9,9,9', u'100000000,1', u'10,10,10', u'1,2,4,2']
但是当我尝试实际执行设置mapPartitions()
作业时,它会失败:
In [3]: executed = answers.collect() 14/09/12 11:18:22 INFO SparkContext: Starting job: collect at:1 14/09/12 11:18:22 INFO DAGScheduler: Got job 2 (collect at :1) with 2 output partitions (allowLocal=false) 14/09/12 11:18:22 INFO DAGScheduler: Final stage: Stage 2(collect at :1) 14/09/12 11:18:22 INFO DAGScheduler: Parents of final stage: List() 14/09/12 11:18:22 INFO DAGScheduler: Missing parents: List() 14/09/12 11:18:22 INFO DAGScheduler: Submitting Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37), which has no missing parents 14/09/12 11:18:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37) 14/09/12 11:18:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks 14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:0 as 3112 bytes in 1 ms 14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:1 as 3112 bytes in 0 ms 14/09/12 11:18:22 INFO Executor: Running task ID 0 14/09/12 11:18:22 INFO Executor: Running task ID 1 14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally 14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally 14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_1 not found, computing it 14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_0 not found, computing it 14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:31+32 14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:0+31 14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(288) called with curMem=133256, maxMem=308910489 14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_1 stored as values to memory (estimated size 288.0 B, free 294.5 MB) 14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(304) called with curMem=133544, maxMem=308910489 14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 304.0 B, free 294.5 MB) 14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_0 in memory on 18.111.61.9:58306 (size: 304.0 B, free: 294.6 MB) 14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_1 in memory on 18.111.61.9:58306 (size: 288.0 B, free: 294.6 MB) 14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_0 14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_1 0 => 1,2,3,4,5,1 1 => 6,4,8,3,2 2 => 9,9,9,9,9,9 0 => 100000000,1 1 => PySpark worker failed with exception:10,10,10 2 => 1,2,4,2 PySpark worker failed with exception: Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable 14/09/12 11:18:22 ERROR Executor: Exception in task ID 1 org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1. (PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/09/12 11:18:22 ERROR Executor: Exception in task ID 0 org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1. (PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/09/12 11:18:22 WARN TaskSetManager: Lost TID 0 (task 2.0:0) 14/09/12 11:18:22 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1. (PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 14/09/12 11:18:22 ERROR TaskSetManager: Task 2.0:0 failed 1 times; aborting job 14/09/12 11:18:22 INFO TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable [duplicate 1] 14/09/12 11:18:22 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 14/09/12 11:18:22 INFO DAGScheduler: Failed to run collect at :1 14/09/12 11:18:22 INFO TaskSchedulerImpl: Cancelling stage 2 --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () ----> 1 executed = answers.collect() /usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/rdd.pyc in collect(self) 581 """ 582 with _JavaStackTrace(self.context) as st: --> 583 bytesInJava = self._jrdd.collect().iterator() 584 return list(self._collect_iterator_through_file(bytesInJava)) 585 /usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 535 answer = self.gateway_client.send_command(command) 536 return_value = get_return_value(answer, self.gateway_client, --> 537 self.target_id, self.name) 538 539 for temp_arg in temp_args: /usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o38.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream for obj in iterator: File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched for item in iterator: TypeError: 'NoneType' object is not iterable org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) org.apache.spark.api.python.PythonRDD$$anon$1. (PythonRDD.scala:145) org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
它显然正在为输入做正确的事情,但由于某些原因,Spark最终试图将mapPartition()
函数提供给导致TypeError: 'NoneType' object is not iterable
错误的函数.
我有什么想法我做错了吗?我完全不知所措.