ElasticSearch to Spark RDD

 刘旭 发布于 2022-12-28 17:55

我正在使用elasticsearch中加载的一些测试数据在本地计算机上测试ElasticSearch和Spark集成.

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

代码运行正常并使用esRDD.first()成功返回正确的结果

但是,esRDD.collect()将生成异常:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我相信这与这里提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html 所以我相应地添加了这一行

conf.set("spark.serializer", classOf[KryoSerializer].getName)

我应该做些什么来让它发挥作用吗?谢谢


更新:序列化设置问题已解决.通过使用

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

代替

conf.set("spark.serializer", classOf[KryoSerializer].getName)

现在还有另一个此数据集中有1000条不同的记录

esRDD.count()

但是,返回1000没问题

esRDD.distinct().count()

返回5!如果我打印记录

esRDD.foreach(println)

它正确打印出1000条记录.但如果我使用收集或采取

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

它将打印DUPLICATED记录,并且确实只显示了5个UNIQUE记录,这似乎是整个数据集的随机子集 - 它不是前5个记录.如果我保存RDD并将其读回

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

esRDD2表现如预期.我想知道是否有一个bug,或者我不了解collect/take的行为.或者是因为我在本地运行所有东西.默认情况下,Spark RDD似乎使用5个分区,如"spark-output"文件的part-xxxx文件数所示.这可能就是为什么esRDD.collect()和esRDD.distinct()返回5个唯一记录,而不是其他一些随机数.但那仍然不对.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有