使用Mongo-Hadoop连接器通过Apache Spark更新MongoDb中的集合

 ChinaSpecial 发布于 2022-12-15 18:21

我想通过Spark in Java更新MongoDb中的特定集合.我正在使用MongoDB Connector for Hadoop在Java中检索Apache Spark到MongoDb的信息并将其保存.

在关注Sampo Niskanen关于通过Spark检索和保存集合到MongoDb的优秀帖子之后,我对更新集合感到困惑.

MongoOutputFormat.java包含一个构造函数,它使用String [] updateKeys,我猜这是指一个可能的键列表,可以在现有集合上进行比较并执行更新.但是,使用Spark的saveAsNewApiHadoopFile()方法和参数MongoOutputFormat.class,我想知道如何使用该更新构造函数.

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

在此之前,MongoUpdateWritable.java用于执行集合更新.从我在Hadoop上看到的例子来看,这通常是设置的mongo.job.output.value,在Spark中可能是这样的:

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, MongoUpdateWritable.class, MongoOutputFormat.class, config);

但是,我仍然想知道如何指定更新密钥MongoUpdateWritable.java.

不可否认,作为一种hacky方式,我将对象的"_id"设置为我的文档的KeyValue,以便在执行保存时,集合将覆盖具有相同KeyValue的文档_id.

JavaPairRDD analyticsResult; //JavaPairRdd of (mongoObject,result)
JavaPairRDD save = analyticsResult.mapToPair(s -> {
    BSONObject o = (BSONObject) s._1;

    //for all keys, set _id to key:value_
    String id = "";
    for (String key : o.keySet()){
        id += key + ":" + (String) o.get(key) + "_";
    }
    o.put("_id", id);

    o.put("result", s._2);
    return new Tuple2<>(null, o);
});

save.saveAsNewAPIHadoopFile("file:///bogus", Object.class, Object.class, MongoOutputFormat.class, config);

我想通过Spark使用MongoOutputFormat或者执行mongodb集合更新,MongoUpdateWritable或者Configuration理想情况下使用该saveAsNewAPIHadoopFile()方法.可能吗?如果没有,有没有其他方法不涉及专门设置_id到我想要更新的键值?

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有