在尝试使用MongoDB作为输入RDD时,我在org.bson.BasicBSONDecoder._decode中得到"java.lang.IllegalStateException:not ready":
Configuration conf = new Configuration(); conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input"); JavaPairRDD
我得到的例外是:14/08/06 09:49:57 INFO rdd.NewHadoopRDD:输入拆分:
MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57 WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException java.lang.IllegalStateException: not ready at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139) at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123) at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) at java.lang.reflect.Method.invoke(Method.java:618) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626) at java.lang.Thread.run(Thread.java:804)
所有程序输出都在这里
环境:
红帽
Spark 1.0.1
Hadoop 2.4.1
MongoDB 2.4.10
蒙戈 - Hadoop的1.3
小智.. 5
我想我发现了这个问题:mongodb-hadoop在其core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java中的BSON编码器/解码器实例上有一个"静态"修饰符.当Spark以多线程模式运行时,所有线程都会尝试使用相同的编码器/解码器实例进行反序列化,这可能会导致结果不佳.
修补我的github 这里 (已提交上游拉入请求)
我现在能够从Python运行8核多线程Spark-> mongo集合计数()!
我想我发现了这个问题:mongodb-hadoop在其core/src/main/java/com/mongodb/hadoop/input/MongoInputSplit.java中的BSON编码器/解码器实例上有一个"静态"修饰符.当Spark以多线程模式运行时,所有线程都会尝试使用相同的编码器/解码器实例进行反序列化,这可能会导致结果不佳.
修补我的github 这里 (已提交上游拉入请求)
我现在能够从Python运行8核多线程Spark-> mongo集合计数()!