你好呀,我是小羊。
如何快速的把数据库数据同步到es呢?之前我们介绍了使用flinkcdc的方案实时同步,读取binlog方式获取mysql 的数据变更,再写入es,今天我们介绍一个使用logstash定时同步mysql 到es 的方案。
1.简介
Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。
管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。如:json、multiline等
我们这次使用logstash的同步的原理和定时任务同步一样,logstash-jdbc 插件定时的从mysql 读取数据,然后输送到logstash 管道,再经过过滤处理,最终经过output 输出写入es。整个流程简单清晰。如果是业务逻辑比较简单的同步。可以采用logstash 方案,节省成本。
2.环境准备
-
准备一个mysql节点和jdbc jar 包 -
下载安装logstash -
下载安装elasticsearch
logstash 官网 https://www.elastic.co/cn/downloads/logstash
下载好后并解压
下载安装好es
es 的安装注意下系统配置
关闭swap
linux 下的 swap 类似于虚拟内存,如果开启了swap, 系统会在内存不够用的时候,拿一些硬盘资源当成内存使用,如果对于内存比较小的机器,这个还是有一些用的,但是它也有缺点,因为硬盘的速度是远远比不上内存的。所以它会影响整体的性能,对于生产环境的机器,内存一般比较大,所以关闭swap是比较好的一个选择,官方也是推荐关闭。
es jvm 内存为服务器一半,并且不超过 32g
es 底层是使用 lucene 来实现的,lucene 的核心技术就是倒排索引,倒排索引通俗的讲,就是将存储的文本分割成不同词组,然后把词组存储到内存中,并和存储的文本关联,查找的时候,如果找到了文字,再根据关联关系,找到存储的文本。所以,内存越大,内存存储的单词就越多,查询的也就越快。所以一般的生产机器的内存需要比较大的内存。比如32g 64g 等。linux 内存可以分2部分,一半给 es jvm 内存使用,另外一半给 lucene 和系统使用,注意,jvm 内存不要超过32g,JVM 在内存小于 32 GB 的时候会采用一个内存对象指针压缩技术.如果超过了 32g,jvm 就不再使用内存对象指针压缩了,会造成一些内存浪费,可以用这些内存多增加几个es 节点。更能提升es 集群的性能
ulmit max_map_count 65535ulimit 是linux 进程限制数量,因为es在运行的过程中会进行频繁的io 读写,所以 ulimit 太小会影响到 es 的性能,这里可以看下官方的说明,推荐我们把它调整到 65535
准备logstash 配置
input {
beats {
port => 5000
}
stdin {}
jdbc {
type => "jdbc"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://10.82.43.51:37201/coupon_center_2_1?characterEncoding=UTF8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=512&prepStmtCacheSqlLimit=8000"
# 数据库连接账号密码;
jdbc_user => "root"
jdbc_password => "root"
# MySQL依赖包路径;
jdbc_driver_library => "/app/logstash/config/mysql-connector-java-5.1.49.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库重连尝试次数
connection_retry_attempts => "3"
# 判断数据库连接是否可用,默认false不开启
jdbc_validate_connection => "true"
# 数据库连接可用校验超时时间,默认3600S
jdbc_validation_timeout => "3600"
# 开启分页查询(默认false不开启);
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
jdbc_page_size => "500"
# statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
# sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
# statement_filepath => "mysql/jdbc.sql"
#statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `orders` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
statement => "select order_id, order_date,customer_name,price,order_status FROM `orders` WHERE order_date>= :sql_last_value order by order_date asc"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
lowercase_column_names => false
# Value can be any of: fatal,error,warn,info,debug,默认info;
sql_log_level => warn
#
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
record_last_run => true
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "order_date"
# Value can be any of: numeric,timestamp,Default value is "numeric"
tracking_column_type => timestamp
# record_last_run上次数据存放位置;
last_run_metadata_path => "/app/logstash/config/last_id.txt"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
clean_run => false
#
# 同步频率(分 时 天 月 年),默认每分钟同步一次;
schedule => "*/1 * * * * *"
sequel_opts => {
login_timeout => "60"
prefetch_rows => "5000"
jdbc_properties =>
{
"defaultRowPrefetch" => "5000"
"loginTimeout" => "60"
"inactiveConnectionTimeout" => "120"
"timeoutCheckInterval" => "120"
"tcpKeepAlive" => "true"
"oracle.net.READ_TIMEOUT" => "5000"
"validationQuery" => "SELECT 1 FROM DUAL"
}
}
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
# convert 字段类型转换,将字段TotalMoney数据类型改为float;
mutate {
convert => {
"TotalMoney" => "float"
}
}
}
output {
elasticsearch {
hosts => "10.82.43.185:9200"
# 配置ES集群地址
# hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
# 索引名字,必须小写
index => "es_orders"
# 数据唯一索引(建议使用数据库KeyID)
document_id => "%{order_id}"
}
stdout {
codec => json_lines
}
}
input 是输入端的配置,主要是mysql连接信息以及jar 包的驱动 sql 之类。
filter 是logstash内部的组件,可以对查询的数据做一些操作,比如过滤字段,时间转换等等。
output 是输出端的配置,主要是一些es 相关信息。
执行命令
binlogstash.bat -f mysqljdbc.conf
就可以看到mysql 数据同步到es了。
3.配置化操作
logstash 支持把任务配置在配置文件。
在/config/pipelines.yml文件可以配置多个任务
- pipeline.id: order_logistics_item
path.config: "/app/logstash/config/order_logistics_item/order_logistics_item.conf"
也可以把sql 独立出来
#要执行的sql文件位置
statement_filepath =>"/app/logstash/config/order_bbc_info/order_bbc_info.sql"
order_bbc_info.sql
select
*
from order_bbc_info
如果是想同步多个表,可以配置多个 pipeline
4.增量同步处理
对于实时变化的数据,如果把变更的数据同步到es呢?可以在每个表里面配置一个更新时间,数据变更时,更新时间同步修改,logstash 根据变更时间就可以查询到最新的数据。logstash 支持把最后查询的数据保存到文件中,下次查询可以取到,再结合数据表变更时间,就可以做到增量同步。
保存上次查询最后一条数据的更新时间
#记录字段的文件位置和名称
last_run_metadata_path => "/app/logstash/config/order_bbc_info/jdbc_oracle_lastvalue.txt"
sql 中加查询条件,限制只查最新变更的数据,即可做到增量同步
where
obi.update_time > :sql_last_value
and obi.update_time < NOW()
order by obi.update_time asc
建议把时间往前多查一点,预防数据不一致的情况。
5.数据表多对一处理
设计es 索引的时候,经常会使用多张表做成一个较多字段的索引。减少因为关联关系的时间损失。
如果需要同步这样的数据,logstash 需要如何同步呢?增量同步该如何做?
答案是使用多个同步任务。同时同步不同的表到es
比如order 表和 order_operation 表是一对一关联,可以这样同步
- pipeline.id: order
path.config: "/app/logstash/config/order_header/order.conf"
- pipeline.id: order_operation
path.config: "/app/logstash/config/order_operation/order_operation.conf"
order.sql
select
*
from order o
where
:sql_last_value
and o.update_time < NOW()
order by o.update_time asc
order_operation.sql
select
*
from order o
left join order_operation oop on oop.order_id=o.id
where
oop.update_time > :sql_last_value
and oop.update_time < NOW()
order by oop.update_time asc
根据 2张表的更新时间,分别创建不同的同步任务,查询不同的数据然后再写入同样的es文档中。即可解决这个问题。
6.总结
logstash 同步数据还是比较简单方便的,也可以做到比较快的同步(秒级),适合对实时性不那么高的系统同步。相对于 canal + kafka ,flinkcdc+ kafka 等实时同步方案,它引入的组件很少,只有logstash,最大的优点就是开发运维成本低。可靠性高。缺点的话就是不太适合做复杂的处理,如果需要对mysql 数据做很复杂的业务处理再写入es.logstash支持的就比较少了。
7.往期精彩推荐
原文始发于微信公众号(小羊架构):使用logstash同步数据到ES,使用简单,容易维护
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/259969.html