使用logstash同步数据到ES,使用简单,容易维护

使用logstash同步数据到ES,使用简单,容易维护


你好呀,我是小羊。

如何快速的把数据库数据同步到es呢?之前我们介绍了使用flinkcdc的方案实时同步,读取binlog方式获取mysql 的数据变更,再写入es,今天我们介绍一个使用logstash定时同步mysql 到es 的方案。

使用logstash同步数据到ES,使用简单,容易维护

1.简介

Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。

使用logstash同步数据到ES,使用简单,容易维护

管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。如:json、multiline等

使用logstash同步数据到ES,使用简单,容易维护我们这次使用logstash的同步的原理和定时任务同步一样,logstash-jdbc 插件定时的从mysql 读取数据,然后输送到logstash 管道,再经过过滤处理,最终经过output 输出写入es。整个流程简单清晰。如果是业务逻辑比较简单的同步。可以采用logstash 方案,节省成本。

2.环境准备

  1. 准备一个mysql节点和jdbc jar 包
  2. 下载安装logstash
  3. 下载安装elasticsearch

logstash 官网 https://www.elastic.co/cn/downloads/logstash

下载好后并解压使用logstash同步数据到ES,使用简单,容易维护

下载安装好es

es 的安装注意下系统配置

关闭swap
linux 下的 swap 类似于虚拟内存,如果开启了swap, 系统会在内存不够用的时候,拿一些硬盘资源当成内存使用,如果对于内存比较小的机器,这个还是有一些用的,但是它也有缺点,因为硬盘的速度是远远比不上内存的。所以它会影响整体的性能,对于生产环境的机器,内存一般比较大,所以关闭swap是比较好的一个选择,官方也是推荐关闭。

es jvm 内存为服务器一半,并且不超过 32g

es 底层是使用 lucene 来实现的,lucene 的核心技术就是倒排索引,倒排索引通俗的讲,就是将存储的文本分割成不同词组,然后把词组存储到内存中,并和存储的文本关联,查找的时候,如果找到了文字,再根据关联关系,找到存储的文本。所以,内存越大,内存存储的单词就越多,查询的也就越快。所以一般的生产机器的内存需要比较大的内存。比如32g 64g 等。linux 内存可以分2部分,一半给 es jvm 内存使用,另外一半给 lucene 和系统使用,注意,jvm 内存不要超过32g,JVM 在内存小于 32 GB 的时候会采用一个内存对象指针压缩技术.如果超过了 32g,jvm 就不再使用内存对象指针压缩了,会造成一些内存浪费,可以用这些内存多增加几个es 节点。更能提升es 集群的性能

ulmit max_map_count    65535ulimit 是linux 进程限制数量,因为es在运行的过程中会进行频繁的io 读写,所以 ulimit 太小会影响到 es 的性能,这里可以看下官方的说明,推荐我们把它调整到 65535

准备logstash 配置

input {
    beats {
      port => 5000
    }
    stdin {}
    jdbc {
        type => "jdbc"
         # 数据库连接地址
        jdbc_connection_string => "jdbc:mysql://10.82.43.51:37201/coupon_center_2_1?characterEncoding=UTF8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=512&prepStmtCacheSqlLimit=8000"
         # 数据库连接账号密码;
        jdbc_user => "root"
        jdbc_password => "root"
         # MySQL依赖包路径;
        jdbc_driver_library => "/app/logstash/config/mysql-connector-java-5.1.49.jar"
         # the name of the driver class for mysql
        jdbc_driver_class => "com.mysql.jdbc.Driver"
         # 数据库重连尝试次数
        connection_retry_attempts => "3"
         # 判断数据库连接是否可用,默认false不开启
        jdbc_validate_connection => "true"
         # 数据库连接可用校验超时时间,默认3600S
        jdbc_validation_timeout => "3600"
         # 开启分页查询(默认false不开启);
        jdbc_paging_enabled => "true"
         # 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);
        jdbc_page_size => "500"
         # statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;
         # sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;
         # statement_filepath => "mysql/jdbc.sql"
        #statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `orders` WHERE ModifyTime>= :sql_last_value order by ModifyTime asc"
        statement => "select order_id, order_date,customer_name,price,order_status FROM `orders` WHERE order_date>= :sql_last_value order by order_date asc"
         # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
        lowercase_column_names => false
         # Value can be any of: fatal,error,warn,info,debug,默认info;
        sql_log_level => warn
         #
         # 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;
        record_last_run => true
         # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;
        use_column_value => true
         # 需要记录的字段,用于增量同步,需是数据库字段
        tracking_column => "order_date"
         # Value can be any of: numeric,timestamp,Default value is "numeric"
        tracking_column_type => timestamp
         # record_last_run上次数据存放位置;
        last_run_metadata_path => "/app/logstash/config/last_id.txt"
         # 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;
        clean_run => false
         #
         # 同步频率(分 时 天 月 年),默认每分钟同步一次;
        schedule => "*/1 * * * * *"
        sequel_opts => {   
   login_timeout => "60"
   prefetch_rows => "5000"   
   jdbc_properties =>
    {
     "defaultRowPrefetch" => "5000"
     "loginTimeout" => "60"
     "inactiveConnectionTimeout" => "120"
     "timeoutCheckInterval" => "120"
     "tcpKeepAlive" => "true"
     "oracle.net.READ_TIMEOUT" => "5000"
     "validationQuery" => "SELECT 1 FROM DUAL"
    }
  }
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
    # convert 字段类型转换,将字段TotalMoney数据类型改为float;
    mutate {
        convert => {
            "TotalMoney" => "float"
        }
    }
}
output {
    elasticsearch {
        hosts => "10.82.43.185:9200"
         # 配置ES集群地址
        # hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"]
         # 索引名字,必须小写
        index => "es_orders"
         # 数据唯一索引(建议使用数据库KeyID)
        document_id => "%{order_id}"
    }
    stdout {
        codec => json_lines
    }
}

input 是输入端的配置,主要是mysql连接信息以及jar 包的驱动 sql 之类。
filter 是logstash内部的组件,可以对查询的数据做一些操作,比如过滤字段,时间转换等等。
output 是输出端的配置,主要是一些es 相关信息。

执行命令

binlogstash.bat -f mysqljdbc.conf

就可以看到mysql 数据同步到es了。

3.配置化操作

logstash 支持把任务配置在配置文件。

在/config/pipelines.yml文件可以配置多个任务

- pipeline.id: order_logistics_item
  path.config: "/app/logstash/config/order_logistics_item/order_logistics_item.conf"

也可以把sql 独立出来

 #要执行的sql文件位置
  statement_filepath =>"/app/logstash/config/order_bbc_info/order_bbc_info.sql"

order_bbc_info.sql

select
   *
from order_bbc_info

如果是想同步多个表,可以配置多个 pipeline

4.增量同步处理

对于实时变化的数据,如果把变更的数据同步到es呢?可以在每个表里面配置一个更新时间,数据变更时,更新时间同步修改,logstash 根据变更时间就可以查询到最新的数据。logstash 支持把最后查询的数据保存到文件中,下次查询可以取到,再结合数据表变更时间,就可以做到增量同步。

保存上次查询最后一条数据的更新时间

    #记录字段的文件位置和名称
    last_run_metadata_path => "/app/logstash/config/order_bbc_info/jdbc_oracle_lastvalue.txt"

sql 中加查询条件,限制只查最新变更的数据,即可做到增量同步

where 
     obi.update_time > :sql_last_value 
     and obi.update_time <  NOW() 
 order by obi.update_time asc 

建议把时间往前多查一点,预防数据不一致的情况。

5.数据表多对一处理

设计es 索引的时候,经常会使用多张表做成一个较多字段的索引。减少因为关联关系的时间损失。

如果需要同步这样的数据,logstash 需要如何同步呢?增量同步该如何做?

答案是使用多个同步任务。同时同步不同的表到es

比如order 表和 order_operation 表是一对一关联,可以这样同步

- pipeline.id: order
  path.config: "/app/logstash/config/order_header/order.conf"
- pipeline.id: order_operation
  path.config: "/app/logstash/config/order_operation/order_operation.conf"

order.sql

 select
    *
from order o
where 
     :sql_last_value
     and o.update_time < NOW() 
 order by o.update_time asc 

order_operation.sql

 select
    *
from order o
left join order_operation oop on oop.order_id=o.id
where 
     oop.update_time > :sql_last_value
     and oop.update_time < NOW() 
 order by oop.update_time asc 

根据 2张表的更新时间,分别创建不同的同步任务,查询不同的数据然后再写入同样的es文档中。即可解决这个问题。

6.总结

logstash 同步数据还是比较简单方便的,也可以做到比较快的同步(秒级),适合对实时性不那么高的系统同步。相对于 canal + kafka ,flinkcdc+ kafka 等实时同步方案,它引入的组件很少,只有logstash,最大的优点就是开发运维成本低。可靠性高。缺点的话就是不太适合做复杂的处理,如果需要对mysql 数据做很复杂的业务处理再写入es.logstash支持的就比较少了。

7.往期精彩推荐

 


 国内免费使用chatGPT的方法

数据库实时同步方案

推荐一款开源的大屏报表项目


欢迎点赞转发给我鼓励~

使用logstash同步数据到ES,使用简单,容易维护使用logstash同步数据到ES,使用简单,容易维护关注我获取更多IT分享 使用logstash同步数据到ES,使用简单,容易维护使用logstash同步数据到ES,使用简单,容易维护



原文始发于微信公众号(小羊架构):使用logstash同步数据到ES,使用简单,容易维护

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/259969.html

(0)
Java朝阳的头像Java朝阳

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!