我有一个mapper方法:
def mapper(value): ... for key, value in some_list: yield key, value
实际上,我需要的是与普通的wordcount示例相差不远.我已经有了工作脚本,但只有当mapper方法看起来像这样:
def mapper(value): ... return key, value
这就是它的调用方式:
sc.textFile(sys.argv[2], 1).map(mapper).reduceByKey(reducer).collect()
我花了2个小时尝试编写支持mapper生成器的代码.但不能这样做.我甚至同意只返回一份清单:
def mapper(value): ... result_list = [] for key, value in some_list: result_list.append( key, value ) return result_list
在这里:https://groups.google.com/forum/#!searchin/spark-users/flatmap $ 20multiple/spark-users/1WqVhRBaJsU/-D5QRbenlUgJ我发现我应该使用flatMap,但它没有做到这一点 - 然后我的reducer开始得到像(key1,value1,key2,value2,value3,...)这样的输入 - 但它应该是[(key1,value1),(key2,value2,value3)...].换句话说,reducer开始只使用单个部分,并且不知道它是值还是键,以及值是否 - 它属于哪个键.
那么如何使用返回迭代器或列表的映射器呢?
谢谢!
flatMap
如果需要返回多个输出的map函数,可以使用.
传递给的函数flatMap
可以返回一个iterable:
>>> words = sc.textFile("README.md") >>> def mapper(line): ... return ((word, 1) for word in line.split()) ... >>> words.flatMap(mapper).take(4) [(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)] >>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y) >>> counts.take(5) [(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]
它也可以是一个发电机功能:
>>> words = sc.textFile("README.md") >>> def mapper(line): ... for word in line.split(): ... yield (word, 1) ... >>> words.flatMap(mapper).take(4) [(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)] >>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y) >>> counts.take(5) [(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]
您提到过您尝试了flatMap
但它将所有内容压缩到列表[key, value, key, value, ...]
而不是[(key, value), (key, value)...]
键值对列表.我怀疑这是你的地图功能中的一个问题.如果您仍然遇到此问题,可以发布更完整版的地图功能吗?