0%

使用Logstash同步mysql到Elasticsearch 6.0.1

Logstash是一个数据收集管道,配置输入输出,可将数据从一个地方传到另一个地方。同步mysql到Elasticsearch,这里的输入,指的是mysql,输出就是Elasticsearch。 新版本的Logstash和Elasticsearch跟之前老版本的有些不同,所以我也是自己折腾了小半天,总算成功。

下面的操作,Logstash和Elasticsearch的版本都是6.0.1


  1. 下载安装 Logstash和Elasticsearch就不介绍了,下载后解压即可使用。 默认Logstash不包含读取数据库的jdbc插件,需要手动下载。进入Logstash的bin目录,执行:

    ./logstash-plugin install logstash-input-jdbc
    

    因为网络原因,安装可能费点时间。

  2. 在某处建一个目录,放置配置文件(哪不重要,因为执行时会配置路径),我这放到bin下的mysql目录里。

  3. 复制mysql jdbc驱动(mysql-connector-java-5.1.35.jar)到该目录下

  4. 编写导出数据的sql,放到sql.sql(文件名自己取)里,内容类似这样:

    select *, id as car_id 
    from violation_car car 
    where car.gmt_modified>= :sql_last_value
    

    这里需要介绍下,我的表里有个gmt_modified字段,用于记录该条记录的最后修改时间,sql_last_value是Logstash查询后,保存的上次查询时间,第一次查询时,该值是1970/1/1,所以第一次导入,如果你的表现有数据很多,可能会有点问题,后面会根据最后修改时间,更新修改过的数据。

  5. 编写输入输出配置文件jdbc.conf

    input {
      jdbc {
        jdbc_driver_library => "/xxx/xxx/logstash-6.0.1/bin/mysql/mysql-connector-java-5.1.35.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://localhost:3306/violation"
        jdbc_user => "root"
        jdbc_password => ""
        schedule => "* * * * *"
        jdbc_default_timezone => "Asia/Shanghai"
        statement_filepath => "/xxx/xxx/logstash-6.0.1/bin/mysql/sql.sql"
        use_column_value  => false
        last_run_metadata_path => "/xxx/xxx/logstash-6.0.1/bin/mysql/last_run.txt"
      }
    }
    output {
        elasticsearch {
            hosts => ["127.0.0.1:9200"]
            index => "violation"
            document_id => "%{car_id}"
            document_type => "car"
        }
        stdout {
            codec => json_lines
        }
    }
    

    字段介绍:

    input.jdbc.jdbc_driver_library  jdbc驱动的位置
    input.jdbc.jdbc_driver_class    驱动类名
    input.jdbc.jdbc_connection_string   数据库连接字符串
    input.jdbc.jdbc_user    用户名
    input.jdbc.jdbc_password  密码
    input.jdbc.schedule   更新计划(参考linux crontab)
    input.jdbc.jdbc_default_timezone   时区,默认没有时区,日志里时间差8小时,中国需要用Asia/Shanghai
    input.jdbc.statement_filepath 导出数据的sql文件,就是上面写的
    input.jdbc.use_column_value  如果是true,sql_last_value是tracking_column指定字段的数字值,false就是时间,默认是false
    input.jdbc.last_run_metadata_path  保存sql_last_value值文件的位置
    output.elasticsearch.hosts elasticsearch服务器,填多个,请求会负载均衡。
    output.elasticsearch.index 索引名
    output.elasticsearch.document_id   生成文件的id,这里使用sql产生的car_id
    output.elasticsearch.document_type  文档类型 
    output.stdout 配置的是命令行输出,使用json
    
  6. 配置完以后,启动

    ./logstash -f mysql/jdbc.conf
    

    按上面的配置,logstash会每分钟查询一次表,看是否会更新,有更新则提交到Elasticsearch