在大数据开发场景中,数据同步有很多种工具可以实现,其中包括DataX、FlinkCDC、Spark、Canal、sqoop等等,这边文章主要是描述Spark做多种数据源的同步,原理是基于jdbc驱动。
1 Scala版本代码
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.greenplum.spark</groupId>
<artifactId>greenplum-spark-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>3.2.1</spark.version>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.pivotal.greenplum.spark</groupId>
<artifactId>greenplum-spark_${scala.version}</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.pivotal</groupId>
<artifactId>greenplum-jdbc</artifactId>
<version>5.1.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.3-1102-jdbc4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
JDBCSource
package com.greenplum.spark
import org.apache.spark.sql.{DataFrame, SparkSession}
//MysqlDriver:"com.mysql.jdbc.Driver"
//OracleDriver:"oracle.jdbc.driver.OracleDriver"
//GreenplumDriver:"com.pivotal.jdbc.GreenplumDriver"
//postgresqlDriver:"org.postgresql.Driver"
//MysqlURL:"jdbc:mysql://localhost:3306/databaseName"
//OracleURL:"jdbc:oracle:thin:@//localhost:1521:databaseName"
//GreenplumURL:"jdbc:pivotal:greenplum://localhost:15432;DatabaseName=databaseName"
//postgresqlURL:"jdbc:postgresql://localhost:5432/databaseName"
object JDBCSource {
def createDF(session: SparkSession, param: Map[String, AnyRef]): DataFrame = {
session.sqlContext.read
.format("jdbc")
.option("driver", param.get("driver").get.toString)
.option("url", param.get("url").get.toString)
.option("dbtable", param.get("dbtable").get.toString)
.option("user", param.get("user").get.toString)
.option("password", param.get("password").get.toString)
.load()
}
}
JDBCSink
package com.greenplum.spark
import org.apache.spark.sql.DataFrame
//MysqlDriver:"com.mysql.jdbc.Driver"
//OracleDriver:"oracle.jdbc.driver.OracleDriver"
//GreenplumDriver:"com.pivotal.jdbc.GreenplumDriver"
//postgresqlDriver:"org.postgresql.Driver"
//MysqlURL:"jdbc:mysql://localhost:3306/databaseName"
//OracleURL:"jdbc:oracle:thin:@//localhost:1521:databaseName"
//GreenplumURL:"jdbc:pivotal:greenplum://localhost:15432;DatabaseName=databaseName"
//postgresqlURL:"jdbc:postgresql://localhost:5432/databaseName"
//saveMode
// - `overwrite`: overwrite the existing data.
// - `append`: append the data.
// - `ignore`: ignore the operation (i.e. no-op).
// - `error` or `errorifexists`: default option, throw an exception at runtime.
object JDBCSink {
def save(df: DataFrame, saveMode: String, param: Map[String, AnyRef]): Unit = {
df.write
.mode(saveMode)
.format("jdbc")
.option("driver", param.get("driver").get.toString)
.option("url", param.get("url").get.toString)
.option("dbtable", param.get("dbtable").get.toString)
.option("user", param.get("user").get.toString)
.option("password", param.get("password").get.toString)
.save()
}
}
测试类JDBCMainTest,测试场景 MySQL->Greenplum,其他数据源同理
package com.greenplum.spark
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
/**
* @Description: TODO
* @Author: chenweifeng
* @Date: 2022年08月17日 下午3:27
**/
object JDBCMainTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("greenplum_spark_test").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val spark = SparkSession.builder().config(conf).getOrCreate()
// MySQL / Oracle / PostgreSQL / Greenplum
// 以mysql为例,Oracle、 PostgreSQL、Greenplum只需更改相应参数即可
//从mysql数据库中读取数据并加载成DF
val mysqlReadOptionMap = Map(
"driver" -> "com.mysql.cj.jdbc.Driver",
"url" -> "jdbc:mysql://100.89.9.20:3306/test_datax_gp?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai",
"user" -> "root",
"password" -> "MySql8_Root",
"dbtable" -> "test_datax_gp_spark"
)
val mysqlDataFrame: DataFrame = JDBCSource.createDF(spark, mysqlReadOptionMap)
mysqlDataFrame.show()
// spark将结果写入greenplum数据库中
val greenplumWriteOptionMap = Map(
// "driver" -> "org.postgresql.Driver",
// "url" -> "jdbc:postgresql://10.254.21.3:54432/pgbenchdb",
"driver" -> "com.pivotal.jdbc.GreenplumDriver",
"url" -> "jdbc:pivotal:greenplum://10.254.21.3:54432;DatabaseName=pgbenchdb",
"user" -> "gpadmin",
"password" -> "changeme",
"dbschema" -> "public",
"dbtable" -> "test_datax_gp_spark",
)
JDBCSink.save(mysqlDataFrame, SaveMode.Overwrite.name(), greenplumWriteOptionMap)
sc.stop()
}
}
2 Java版本代码
待更新。。。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/71333.html