作者:Angle健少 | 来源:互联网 | 2023-09-25 11:51
文章目录01引言02实现2.1添加依赖2.2FlinkSQL2.3配置Kafka域名03文末01引言最近在做实时采集Kafka发布的内容到MySQL,本文记录一下关
文章目录
- 01 引言
- 02 实现
- 2.1 添加依赖
- 2.2 Flink SQL
- 2.3 配置Kafka域名
- 03 文末
01 引言
最近在做实时采集Kafka
发布的内容到MySQL
,本文记录一下关键的点,细节不再描述,希望能帮助到大家。
02 实现
2.1 添加依赖
在工程,除了添加基础的Flink
环境依赖,还需要添加flink-connector-kafka
的依赖:
<dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka_2.11artifactId><version>1.13.6version>
dependency>
除此&#xff0c;因为Flink
把Kafka
作为了Source
&#xff0c;所以读取的字符串是有解析方式的&#xff0c;本文主要使用的是“json
”的方式&#xff0c;因此还需要引入序列化包的&#xff0c;但是flink-connector-kafka
已经自带了&#xff0c;所以没必要再引入。
ok&#xff0c;到这里如果我们写好FlinkSQL
去启动&#xff0c;直接就会一闪而退了&#xff0c;为什么呢&#xff1f;因为我们缺少了’ kafka-clients-2.1.0.jar
&#39;这个包&#xff0c;但是也无需引入&#xff0c;因为在flink-connector-kafka
里面已经自带了。
为什么要在这里特别提示 “序列化包”和“kafka-clients包呢”&#xff1f;因为如果我们采用Flink On Yarn的方式部署时&#xff0c;这两个包是需要放到HDFS的&#xff0c;如下&#xff1a;
2.2 Flink SQL
好了&#xff0c;到了关键的FlinkSQL
了&#xff0c;该如何写呢&#xff1f;
首先看看Source
&#xff0c;也就是我们的Kafka
&#xff0c;如下&#xff1a;
CREATE TABLE t_student (id INT,name STRING
) WITH (&#39;connector&#39; &#61; &#39;kafka&#39;,&#39;topic&#39; &#61; &#39;cdc_user&#39;,&#39;properties.bootstrap.servers&#39; &#61; &#39;10.194.166.92:9092&#39;,&#39;properties.group.id&#39; &#61; &#39;flink-cdc-mysql-kafka&#39;,&#39;scan.startup.mode&#39; &#61; &#39;earliest-offset&#39;,&#39;format&#39; &#61; &#39;json&#39;
)
然后Sink
输出&#xff0c;我这里需要输出到MySQL
&#xff1a;
CREATE TABLE t_student_copy (id INT,name STRING,PRIMARY KEY (id) NOT ENFORCED) WITH (&#39;connector&#39; &#61; &#39;jdbc&#39;,&#39;url&#39; &#61; &#39;jdbc:mysql://127.0.0.1:3306/big_data&#39;,&#39;username&#39; &#61; &#39;root&#39;,&#39;password&#39; &#61; &#39;123456&#39;,&#39;table-name&#39; &#61; &#39;t_student_copy&#39;
)
最后&#xff0c;使用INSERT INTO
声明如何写入&#xff1a;
INSERT INTO t_student_copy(id,name) SELECT id,name FROM t_student
2.3 配置Kafka域名
还有一点需要注意的是&#xff0c;当我们跑Flink的程序的时候&#xff0c;会出现类似如下错误&#xff1a;
unable to connect broker…
这个时候&#xff0c;我们要在跑Flink
的程序的服务器配置Kafka
的域名&#xff0c;具体在hosts
文件里配置&#xff1a;
vi /etc/hosts
ok&#xff0c;到这里&#xff0c;只要我们只要使用Kafka
工具发送json
格式的数据&#xff0c;Flink
程序就能实时收到&#xff0c;并写入MySQL
数据库。
03 文末
本文主要是记录Kafka
如何实时写入到MySQL
的一些坑点&#xff0c;完整源码就不贴出来了&#xff0c;希望能给大家一点启示并帮助到大家&#xff0c;谢谢大家的阅读&#xff0c;本文完&#xff01;
附&#xff1a;KafkaTool的使用教程&#xff1a;
- https://www.cnblogs.com/miracle-luna/p/11299345.html
- https://www.cnblogs.com/frankdeng/p/9452982.html