我现在正在试验Spark和Mongodb,它使用mongodb-hadoop连接器来桥接火花和mongodb通信.以下是https://github.com/plaa/mongo-spark的示例,此示例适用于我.
然后基于这个例子,我使用了来自https://github.com/10gen-interns/big-data-exploration的更大的数据集,该数据集有600万条飞行数据记录.我想要做的是查询mongodb数据集,然后进行一些进一步的处理.
航班数据的架构位于https://gist.github.com/sweetieSong/6016700
看数据示例:
{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ), "age" : 27, "airTime" : 316, "airlineId" : 19805, "arrDelay" : -37, "arrTime" : Date( 1336304580000 ), "carrier" : "AA", "carrierId" : "AA", "crsArrTime" : Date( 1336306800000 ), "crsDepTime" : Date( 1336294800000 ), "crsElapsedTime" : 380, "date" : Date( 1336262400000 ), "dayOfMonth" : 6, "dayOfWeek" : 7, "depDelay" : -5, "depTime" : Date( 1336294500000 ), "destAirport" : "LAX", "destAirportId" : 12892, "destCity" : "Los Angeles, CA", "destCityId" : 32575, "destState" : "California", "destStateId" : "CA", "destWAC" : 91, "distance" : 2475, "diverted" : true, "elapsedTime" : 348, "flightNum" : 1, "month" : 5, "numDivAirportLandings" : 0, "numFlights" : 1, "origAirport" : "JFK", "origAirportId" : 12478, "origCity" : "New York, NY", "origCityId" : 31703, "origState" : "New York", "origStateId" : "NY", "origWAC" : 22, "quarter" : 2, "tailNum" : "N323AA", "taxiIn" : 19, "taxiOut" : 13, "wheelsOff" : Date( 1336295280000 ), "wheelsOn" : Date( 1336303440000 ), "year" : 2012 }
我的scala代码是
val sc = new SparkContext("local", "Scala Word Count") val config = new Configuration() config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights") config.set("mongo.input.query","{destAirport: 'LAX'}"); //config.set("mongo.input.query","{_id.destAirport: 'LAX'}"); val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
println("我们正在运行scala..count",mongoRDD.count())
出于测试目的,我只想先从destAirport'LAX'获取所有记录,我不知道查询是怎样的,所以我尝试了两种不同格式的查询,"{destAirport:'LAX'}"和"{ _id.destAirport:'LAX'}"
运行应用程序时,控制台会输出此类信息
INFO MongoCollectionSplitter:创建分割:min = {"_ id":{"$ oid":"51bf29d8ca69141e42097d7f"}},max = {"_ id":{"$ oid":"51bf29dfca69141e420991ad"}}
14/08/05 10:30:51 INFO Executor: Running task ID 751 14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192) 14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750) 14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally 14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false} 14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from: 14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false} 14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597 14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver 14/08/05 10:30:51 INFO Executor: Finished task ID 751
无论查询是什么(甚至不设置查询),spark总是执行1191任务.每个任务都会输出相似的单词.和mongoRDD.count()始终输出0.
我的第一个问题是什么是正确的查询?
此外我以前认为mongodb-hadoop做的是,mongodb首先查询所有集合,然后将结果发送回spark进行处理.但是现在在我看来,mongodb会将这个集合拆分成很多,然后查询那个小部分的集合,然后将该部分的结果发送给spark.是吗 ?
我的第一个问题是什么是正确的查询?
我认为没有"正确"的查询 - 您需要根据要处理的数据进行查询
此外我以前认为mongodb-hadoop做的是,mongodb首先查询所有集合,然后将结果发送回spark进行处理.但是现在在我看来,mongodb会将这个集合拆分成很多,然后查询那个小部分的集合,然后将该部分的结果发送给spark.是吗 ?
我遇到了同样的问题.
我相信,给定MongoInputSplit.class的newAPIHadoopRDD在计算拆分时不考虑查询.它仅在计算分割后应用.这意味着无论您的查询多么精简,分割的数量将保持不变,并且将与集合的大小成比例.
newAPIHadoopRDD正在使用StandaloneMongoSplitter.请注意,此类未使用查询来计算拆分边界.它只是使用mongo的内部"splitVector"命令; 从这里的文档 - http://api.mongodb.org/internal/current/commands.html,它看起来也不考虑查询.
我没有一个好的解决方案.更好的方法是在计算查询后才拆分mongo集合,但这需要拆分器的另一个实现.以下是关于该问题的一些很好的解读:http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/