Logstash是一个数据收集管道,配置输入输出,可将数据从一个地方传到另一个地方。同步mysql到Elasticsearch,这里的输入,指的是mysql,输出就是Elasticsearch。 新版本的Logstash和Elasticsearch跟之前老版本的有些不同,所以我也是自己折腾了小半天,总算成功。
下面的操作,Logstash和Elasticsearch的版本都是6.0.1
下载安装 Logstash和Elasticsearch就不介绍了,下载后解压即可使用。 默认Logstash不包含读取数据库的jdbc插件,需要手动下载。进入Logstash的bin目录,执行:
./logstash-plugin install logstash-input-jdbc
因为网络原因,安装可能费点时间。
在某处建一个目录,放置配置文件(哪不重要,因为执行时会配置路径),我这放到bin下的mysql目录里。
复制mysql jdbc驱动(mysql-connector-java-5.1.35.jar)到该目录下
编写导出数据的sql,放到sql.sql(文件名自己取)里,内容类似这样:
select *, id as car_id from violation_car car where car.gmt_modified>= :sql_last_value
这里需要介绍下,我的表里有个gmt_modified字段,用于记录该条记录的最后修改时间,sql_last_value是Logstash查询后,保存的上次查询时间,第一次查询时,该值是1970/1/1,所以第一次导入,如果你的表现有数据很多,可能会有点问题,后面会根据最后修改时间,更新修改过的数据。
编写输入输出配置文件jdbc.conf
input { jdbc { jdbc_driver_library => "/xxx/xxx/logstash-6.0.1/bin/mysql/mysql-connector-java-5.1.35.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/violation" jdbc_user => "root" jdbc_password => "" schedule => "* * * * *" jdbc_default_timezone => "Asia/Shanghai" statement_filepath => "/xxx/xxx/logstash-6.0.1/bin/mysql/sql.sql" use_column_value => false last_run_metadata_path => "/xxx/xxx/logstash-6.0.1/bin/mysql/last_run.txt" } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "violation" document_id => "%{car_id}" document_type => "car" } stdout { codec => json_lines } }
字段介绍:
input.jdbc.jdbc_driver_library jdbc驱动的位置 input.jdbc.jdbc_driver_class 驱动类名 input.jdbc.jdbc_connection_string 数据库连接字符串 input.jdbc.jdbc_user 用户名 input.jdbc.jdbc_password 密码 input.jdbc.schedule 更新计划(参考linux crontab) input.jdbc.jdbc_default_timezone 时区,默认没有时区,日志里时间差8小时,中国需要用Asia/Shanghai input.jdbc.statement_filepath 导出数据的sql文件,就是上面写的 input.jdbc.use_column_value 如果是true,sql_last_value是tracking_column指定字段的数字值,false就是时间,默认是false input.jdbc.last_run_metadata_path 保存sql_last_value值文件的位置 output.elasticsearch.hosts elasticsearch服务器,填多个,请求会负载均衡。 output.elasticsearch.index 索引名 output.elasticsearch.document_id 生成文件的id,这里使用sql产生的car_id output.elasticsearch.document_type 文档类型 output.stdout 配置的是命令行输出,使用json
配置完以后,启动
./logstash -f mysql/jdbc.conf
按上面的配置,logstash会每分钟查询一次表,看是否会更新,有更新则提交到Elasticsearch