[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007
概述
最近工作中有一个将MySql数据导入ES的需求,于是网上搜索了一下,发现不尽如人意。看了好几篇文章,还是不知道怎么做,在此记录一下自己成功的经验,没啥技术含量,但对于需要的人来说贼香…
方案
采用Logstash从MySql中抽取数据,然后导入ES中。原理如下图所示
环境准备
如果已经安装了mysql以及ES,可直接跳过以下部分。
安装Mysql/MariaDb
已经安装则跳过,此处采用docker安装。
- 创建本地卷
我通过下面的操作创建了一个目录: ~/software/mariadb/data
cd ~
mkdir -p software/mariadb/data
- 创建docker-compose文件
mariadb-dc.yaml
version: '3'
services:
mariadb:
image: mariadb:10.6.5
ports:
- 3306:3306
volumes:
- ~/software/mariadb/data:/var/lib/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007
- MYSQL_DATABASE=ss007_bd
其中有几点需要说明:
- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007
创建了一个叫ss007的用户
- 执行
在mariadb-dc.yaml
目录下执行
docker-compose -f mariadb-dc.yaml up -d
- 准备数据
数据库已经安装好,使用数据库管理工具,例如Navicat连接数据库并建表导入数据。
当然如果你习惯命令行,进入docker使用mysql命令行客户端操作也是可以的
-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`stu_name` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名',
`age` int(11) NOT NULL DEFAULT 18 COMMENT '年龄',
`create_time` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COMMENT='学生表';
-- ----------------------------
-- Records of student
-- ----------------------------
INSERT INTO `student` VALUES (1, '王二狗', 30, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (2, '牛翠华', 25, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (3, '上官无雪', 18, '2022-02-19 10:57:16');
INSERT INTO `student` VALUES (4, '王婆', 50, '2022-02-19 10:57:16');
安装Elasticsearch与Kibana(可选)
已经安装则跳过,没安装请参考docker之如何使用docker来安装ELK(elasticsearch,logstash,kibana)
安装Logstash
下面将进入本文的关键部分。从官网下载了Logstash7.17.0的压缩包,解压即可。
写logstash配置文件
我们在config文件里新建一个名为my-demo-logstash.conf
的文件,内容如下
input {
jdbc {
jdbc_driver_library => "./mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ss007_db?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
# jdbc_paging_enabled => "true" #是否进行分页
# jdbc_page_size => "50000"
tracking_column => "create_time"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > :sql_last_value;"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => " 10 * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
# document_type => ""
index => "ss007_index"
hosts => ["localhost:9200"]
}
stdout{
codec => rubydebug
}
}
文件分为input 和output两部分,其实还可以有个filter。input 部分从mysql读取数据,output部分向ES插入数据,关键字段解释如下
input:
- jdbc_driver_library:mysql-connector-java.jar 的路径,mysql-connector-java.jar 需要我们去官网下载,用来连接mysql数据库的。我放到了与bin文件同一级别,所以使用
./
作为路径,你要根据你相对bin文件夹放置的位置来修改这个路径。 - tracking_column: mysql中的某一列,logstash用它来跟踪数据的变化。此处我用的是create_time列,那么当新增加一条记录,或者修改一下这个时间的话,logstash就会知道,从而更新数据。
- use_column_value :是否使用
tracking_column
的值作为:sql_last_value
的值。设置为true时,:sql_last_value = tracking_column
。设置为false,:sql_last_value= 上次查询时间
。 - statement:sql查询语句,其中
:sql_last_value
已经在上面介绍过了 - schedule:执行计划,可以通过它设置查询的执行计划,我这里设置每隔10秒执行一次
output:
如果ES中没有index,会自动创建
- document_id => “%{id}” : ES中document的唯一id,这里使用student的主键。
- index => “ss007_index” : ES 的索引
7.0以后不用设置document_type,默认doc
运行
导航到logstash路径下运行以下命令,注意7.x不能进入bin文件执行
sudo bin/logstash -f config/my-demo-logstash.conf
输出如下类似结果说明成功了
...
[2022-02-20T16:22:10,869][INFO ][logstash.inputs.jdbc ][main][4a56def37a98a3ccef919df855d4d915c4fa56a1212a5e6c99a82aa2a6e70df9] (0.009233s) SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERE create_time > 0;
{
"name" => "王婆",
"@timestamp" => 2022-02-20T08:22:10.938Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 4,
"@version" => "1",
"age" => 50
}
{
"name" => "王二狗",
"@timestamp" => 2022-02-20T08:22:10.927Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 1,
"@version" => "1",
"age" => 30
}
...
查看结果
从Kibana上查看ss007_index是否创建并导入了数据
更新数据
因为我们设置了每隔10秒执行一次,所以当我们插入新数据,或者更新数据导致create_time
列的日期增大,logstash就会感知到,随即更新ES中的数据
- 插入一条新数据时的输出
{
"name" => "张妈",
"@timestamp" => 2022-02-20T08:27:10.160Z,
"create_time" => 2022-02-19T21:57:16.000Z,
"id" => 5,
"@version" => "1",
"age" => 50
}
- 修改已有数据时的输出
{
"name" => "牛翠华",
"@timestamp" => 2022-02-20T08:22:10.937Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 2,
"@version" => "1",
"age" => 25
}
总结
搞这个我也是花了一天时间的,理论都知道,一动手就各种问题,如果它对你有帮助就点个赞再走…
参考文章:https://dzone.com/articles/migrating-mysql-data-to-elasticsearch-using-logsta
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/14662.html