如何使用scala将postgreSQL数据库连接到Apache Spark?

 Sunny-阿坚 发布于 2023-01-01 15:00

我想知道如何在scala中执行以下操作?

    使用Spark scala连接到postgreSQL数据库.

    编写SELECT,UPDATE等SQL查询来修改该数据库中的表.

我知道使用scala但是如何在打包时将psql scala的连接器jar导入到sbt中?

1 个回答
  • 我们的目标是从Spark工作者运行并行SQL查询.

    构建设置

    将连接器和JDBC添加到libraryDependenciesbuild.sbt.我只用MySQL试过这个,所以我会在我的例子中使用它,但Postgres应该是一样的.

    libraryDependencies ++= Seq(
      jdbc,
      "mysql" % "mysql-connector-java" % "5.1.29",
      "org.apache.spark" %% "spark-core" % "1.0.1",
      // etc
    )
    

    在创建时,SparkContext告诉它要将哪些罐子复制到执行器.包括连接器jar.一个好看的方式来做到这一点:

    val classes = Seq(
      getClass,                   // To get the jar with our own code.
      classOf[mysql.jdbc.Driver]  // To get the connector.
    )
    val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
    val conf = new SparkConf().setJars(jars)
    

    现在Spark已准备好连接到数据库.每个执行程序都将运行部分查询,以便结果可以进行分布式计算.

    有两种选择.较旧的方法是使用org.apache.spark.rdd.JdbcRDD:

    val rdd = new org.apache.spark.rdd.JdbcRDD(
      sc,
      () => {
        sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
      },
      "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
      0, 1000, 10,
      row => row.getString("BOOK_TITLE")
    )
    

    查看参数文档.简述:

    你有SparkContext.

    然后是一个创建连接的函数.这将在每个worker上调用以连接到数据库.

    然后是SQL查询.这必须与示例类似,并包含起始键和结束键的占位符.

    然后指定键的范围(在我的示例中为0到1000)和分区数.范围将在分区之间划分.因此,一个执行程序线程将最终SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100在示例中执行.

    最后我们有一个功能可以转换ResultSet为某些东西.在示例中,我们将其转换为a String,因此您最终会得到一个RDD[String].

    自Apache Spark版本1.3.0起,另一种方法可通过DataFrame API获得.而不是JdbcRDD你会创建一个org.apache.spark.sql.DataFrame:

    val df = sqlContext.load("jdbc", Map(
      "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
      "dbtable" -> "BOOKS"))
    

    有关选项的完整列表,请参阅https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases(可以设置密钥范围和分区数量)喜欢JdbcRDD).

    更新

    JdbcRDD不支持更新.但你可以简单地做到foreachPartition.

    rdd.foreachPartition { it =>
      val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
      val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
      for (bookTitle <- it) {
        del.setString(1, bookTitle)
        del.executeUpdate
      }
    }
    

    (这会为每个分区创建一个连接.如果这是一个问题,请使用连接池!)

    DataFrame通过createJDBCTableinsertIntoJDBC方法支持更新.

    2023-01-01 15:02 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有