热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkmongojava_java操作spark读写mongodb

首先要引入mongodb-spark-connector的maven依赖,具体的可见这个api网址:https:docs.mongodb.comspar

首先要引入mongodb-spark-connector的maven依赖,具体的可见这个api网址:https://docs.mongodb.com/spark-connector/current/java-api/,然后基本上就可以按照api上面的内容来进行spark操作了。这里面已经有spark读入mongodb数据转化为rdd的操作了。

有一些补充的或许有用(?)的代码,放在这里。

import com.mongodb.MongoClient;

import com.mongodb.MongoClientURI;

import com.mongodb.client.MongoDatabase;

import com.mongodb.spark.MongoConnector;

import com.mongodb.spark.MongoSpark;

import com.mongodb.spark.config.ReadConfig;

import com.mongodb.spark.config.WriteConfig;

import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

import com.mongodb.spark.sql.helpers.StructFields;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.StructType;

import org.bson.Document;

import org.bson.types.ObjectId;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import static java.lang.String.format;

import static java.util.Arrays.asList;

import static java.util.Collections.singletonList;

public final class JavaIntroduction {

/**

* Run this main method to see the output of this quick example.

*

* @param args takes an optional single argument for the connection string

* @throws InterruptedException if a latch is interrupted

*/

public static void main(final String[] args) throws InterruptedException {

JavaSparkContext jsc = createJavaSparkContext(args);

// Create a RDD

JavaRDD documents = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map

(new Function() {

@Override

public Document call(final Integer i) throws Exception {

return Document.parse("{test: " + i + "}");

}

});

// Saving data from an RDD to MongoDB

MongoSpark.save(documents);

// Saving data with a custom WriteConfig

Map writeOverrides = new HashMap();

writeOverrides.put("collection", "spark");

writeOverrides.put("writeConcern.w", "majority");

WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeOverrides);

JavaRDD sparkDocuments = jsc.parallelize(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).map

(new Function() {

@Override

public Document call(final Integer i) throws Exception {

return Document.parse("{spark: " + i + "}");

}

});

// Saving data from an RDD to MongoDB

MongoSpark.save(sparkDocuments, writeConfig);

// Loading and analyzing data from MongoDB

JavaMongoRDD rdd = MongoSpark.load(jsc);

System.out.println(rdd.count());

System.out.println(rdd.first().toJson());

// Loading data with a custom ReadConfig

Map readOverrides = new HashMap();

readOverrides.put("collection", "spark");

readOverrides.put("readPreference.name", "secondaryPreferred");

ReadConfig readConfig = ReadConfig.create(jsc).withOptions(readOverrides);

JavaMongoRDD customRdd = MongoSpark.load(jsc, readConfig);

System.out.println(customRdd.count());

System.out.println(customRdd.first().toJson());

// Filtering an rdd using an aggregation pipeline before passing data to Spark

JavaMongoRDD aggregatedRdd = rdd.withPipeline(singletonList(Document.parse("{ $match: { test : { $gt : 5 } } }")));

System.out.println(aggregatedRdd.count());

System.out.println(aggregatedRdd.first().toJson());

// Datasets

// Drop database

dropDatabase(getMongoClientURI(args));

// Add Sample Data

List characters = asList(

"{'name': 'Bilbo Baggins', 'age': 50}",

"{'name': 'Gandalf', 'age': 1000}",

"{'name': 'Thorin', 'age': 195}",

"{'name': 'Balin', 'age': 178}",

"{'name': 'K铆li', 'age': 77}",

"{'name': 'Dwalin', 'age': 169}",

"{'name': '脫in', 'age': 167}",

"{'name': 'Gl贸in', 'age': 158}",

"{'name': 'F铆li', 'age': 82}",

"{'name': 'Bombur'}"

);

MongoSpark.save(jsc.parallelize(characters).map(new Function() {

@Override

public Document call(final String json) throws Exception {

return Document.parse(json);

}

}));

// Load inferring schema

Dataset df = MongoSpark.load(jsc).toDF();

df.printSchema();

df.show();

// Declare the Schema via a Java Bean

SparkSession sparkSession = SparkSession.builder().getOrCreate();

Dataset explicitDF = MongoSpark.load(jsc).toDF(Character.class);

explicitDF.printSchema();

// SQL

explicitDF.registerTempTable("characters");

Dataset centenarians = sparkSession.sql("SELECT name, age FROM characters WHERE age >= 100");

// Saving DataFrame

MongoSpark.write(centenarians).option("collection", "hundredClub").save();

MongoSpark.load(sparkSession, ReadConfig.create(sparkSession).withOption("collection", "hundredClub"), Character.class).show();

// Drop database

MongoConnector.apply(jsc.sc()).withDatabaseDo(ReadConfig.create(sparkSession), new Function() {

@Override

public Void call(final MongoDatabase db) throws Exception {

db.drop();

return null;

}

});

String objectId = "123400000000000000000000";

List docs = asList(

new Document("_id", new ObjectId(objectId)).append("a", 1),

new Document("_id", new ObjectId()).append("a", 2));

MongoSpark.save(jsc.parallelize(docs));

// Set the schema using the ObjectId helper

StructType schema = DataTypes.createStructType(asList(

StructFields.objectId("_id", false),

DataTypes.createStructField("a", DataTypes.IntegerType, false)));

// Create a dataframe with the helper functions registered

df = MongoSpark.read(sparkSession).schema(schema).option("registerSQLHelperFunctions", "true").load();

// Query using the ObjectId string

df.filter(format("_id = ObjectId('%s')", objectId)).show();

}

private static JavaSparkContext createJavaSparkContext(final String[] args) {

String uri = getMongoClientURI(args);

dropDatabase(uri);

SparkConf conf = new SparkConf()

.setMaster("local")

.setAppName("MongoSparkConnectorTour")

.set("spark.app.id", "MongoSparkConnectorTour")

.set("spark.mongodb.input.uri", uri)

.set("spark.mongodb.output.uri", uri);

return new JavaSparkContext(conf);

}

private static String getMongoClientURI(final String[] args) {

String uri;

if (args.length == 0) {

uri = "mongodb://localhost/test.coll"; // default

} else {

uri = args[0];

}

return uri;

}

private static void dropDatabase(final String connectionString) {

MongoClientURI uri = new MongoClientURI(connectionString);

new MongoClient(uri).dropDatabase(uri.getDatabase());

}

}



推荐阅读
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • javascript  – 概述在Firefox上无法正常工作
    我试图提出一些自定义大纲,以达到一些Web可访问性建议.但我不能用Firefox制作.这就是它在Chrome上的外观:而那个图标实际上是一个锚点.在Firefox上,它只概述了整个 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 解决VS写C#项目导入MySQL数据源报错“You have a usable connection already”问题的正确方法
    本文介绍了在VS写C#项目导入MySQL数据源时出现报错“You have a usable connection already”的问题,并给出了正确的解决方法。详细描述了问题的出现情况和报错信息,并提供了解决该问题的步骤和注意事项。 ... [详细]
  • Go GUIlxn/walk 学习3.菜单栏和工具栏的具体实现
    本文介绍了使用Go语言的GUI库lxn/walk实现菜单栏和工具栏的具体方法,包括消息窗口的产生、文件放置动作响应和提示框的应用。部分代码来自上一篇博客和lxn/walk官方示例。文章提供了学习GUI开发的实际案例和代码示例。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 展开全部下面的代码是创建一个立方体Thisexamplescreatesanddisplaysasimplebox.#Thefirstlineloadstheinit_disp ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了StartingzookeeperFAILEDTOSTART相关的知识,希望对你有一定的参考价值。下载路径:https://ar ... [详细]
  • 本文介绍了如何在Mac上使用Pillow库加载不同于默认字体和大小的字体,并提供了一个简单的示例代码。通过该示例,读者可以了解如何在Python中使用Pillow库来写入不同字体的文本。同时,本文也解决了在Mac上使用Pillow库加载字体时可能遇到的问题。读者可以根据本文提供的示例代码,轻松实现在Mac上使用Pillow库加载不同字体的功能。 ... [详细]
  • JavaScript和HTML之间的交互是经由过程事宜完成的。事宜:文档或浏览器窗口中发作的一些特定的交互霎时。能够运用侦听器(或处置惩罚递次来预订事宜),以便事宜发作时实行相应的 ... [详细]
  • 无详细内容MySQLmysqlmysqlDELIMITERmysqlCREATEFUNCTIONmyProc(costDECIMAL(10,2))-RETURNSDECIMAL(1 ... [详细]
author-avatar
广东淡水未央
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有