数据迁移
数据迁移流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| delete /logger_mes_bak delete /logger_mcs_bak delete /logger_mcs_command_bak delete /logger_mcs_exception_bak
PUT /logger_mcs_exception_bak { "settings": { "number_of_shards": 6, "number_of_replicas": 0, "refresh_interval": -1 }, "mappings" : { "properties" : { "carrier" : { "type" : "keyword" }, "createTime" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss" }, "data" : { "type" : "keyword" }, "detail" : { "type" : "keyword" }, "device" : { "type" : "keyword" }, "logType" : { "type" : "keyword" }, "vehicle" : { "type" : "keyword" } } } }
POST /_reindex { "source": { "index": "logger_mcs_exception" }, "dest": { "index": "logger_mcs_exception_bak" } }
PUT /logger_mcs_exception_bak/_settings { "refresh_interval": "1s", "number_of_replicas": 1 }
DELETE /logger_mcs_exception
PUT /logger_mcs_exception { "settings": { "number_of_shards": 6, "number_of_replicas": 0, "refresh_interval": -1 }, "mappings" : { "properties" : { "carrier" : { "type" : "keyword" }, "createTime" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss.SSS || yyyy-MM-dd HH:mm:ss" }, "data" : { "type" : "keyword" }, "detail" : { "type" : "keyword" }, "device" : { "type" : "keyword" }, "logType" : { "type" : "keyword" }, "vehicle" : { "type" : "keyword" } } } }
POST /_reindex { "source": { "index": "logger_mcs_exception_bak" }, "dest": { "index": "logger_mcs_exception" } }
PUT /logger_mcs_exception/_settings { "refresh_interval": "1s", "number_of_replicas": 1 }
|
reindex 数据迁移
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-reindex.html#docs-reindex-api-request-body
script 使用脚本操作,有两个配置项,source:脚本内容,lang:脚本语言类型(默认实现是painless),详情见 官方文档 Request body
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| POST /_reindex?wait_for_completion=false { "source": { "index": "logger_mes" }, "dest": { "index": "logger_mes_bak" }, "script": { "source": "ctx._source.remove(\"returnCode\");ctx._source.remove(\"returnMessage\")" } }
GET _tasks/<task-id>
POST _tasks/<task-id>/_cancel
|
reindex 来源数据和终点数据可进行查询配置迁移的内容,详情见 官方文档 Request body
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| POST /_reindex { "source": { "index": "logger_mes", "query": { "range": { "createTime": { "gte": , "lte": } } } }, "dest": { "index": "logger_mes_bak" } }
|
常用API
delete by query
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/docs-update-by-query.html
scroll_size:单次内存操作数据量
slices:异步线程,最好配置为auto,es会自动分配和分片数一致,异步线程数大于分片数反而会导致性能低下
wait_for_completion:异步执行操作,请求response为task-id,查询任务跟进请求执行情况
refresh:请求执行成功后刷新所有受影响的分片
1 2 3 4 5 6 7 8 9 10
| POST /<index_name>/_delete_by_query?scroll_size=5000&slices=10&wait_for_completion=false&refresh=true { "query": { "range": { "date": { "lt": "" } } } }
|
使用java进行code实现时需注意(RestHighLevelClient)
1 2 3
| submitDeleteByQueryTask wait_for_completion = false es实现的异步操作,返回值taskid,可根据id查看 deleteByQueryAsync wait_for_completion = true 代码层面实现异步 deleteByQuery wait_for_completion = true 同步删除
|
1 2 3 4 5 6 7
| DeleteByQueryRequest request = new DeleteByQueryRequest(index); request.setBatchSize(5000); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); request.setRefresh(true);
TaskSubmissionResponse taskSubmissionResponse = esClient.submitDeleteByQueryTask(request, RequestOptions.DEFAULT);
|
这里需要注意的时BatchSize参数对应scroll_size,虽然注释有写,但是初看代码时有疑惑,因为这个参数实际赋值如下
1 2 3 4
| public DeleteByQueryRequest setBatchSize(int size) { getSearchRequest().source().size(size); return this; }
|
使用的时size参数,刚好我在代码实现时有考虑类似数据库限制每次操作的数据量,因为query时就有这个size参数,所以自己看源码时就有疑问,deleteByQuery是否支持size限制数据量,BatchSize和scroll_size是否为同一个字段,下面我来解密
1 2 3 4
| if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) { params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize())); }
|
这里我们就能看到实际在deleteByQuery时使用这个字段的值赋给了scroll_size,本来这个字段是query的通用字段,但是在es对delete的实现中封装了一层,构造请求时分成select、update、delete三种,各个参数的应用点也就不同了
单表全数据
1 2 3 4
| GET /logger_moc/_search { "track_total_hits": true }
|
查询全部索引情况
节点堆内存,分片设置
关于ES分片设置的6个建议 - 掘金 (juejin.cn)
es分片设置前需要对索引存储进行评估,最好每个分片处于1~50G
1 2 3 4 5
| GET /_cat/nodes?v&h=heap*
GET /_nodes/stats/jvm?pretty
GET /_nodes/stats/jvm?filter_path=nodes.*.jvm.mem.heap_*
|
集群级别配置修改
1 2 3 4 5
| get _cluster/settings
persistent:这些设置会一直生效,即使集群重启。这些设置可以通过 API 或者配置文件 elasticsearch.yml 来进行设置。 transient:这些设置在集群重启之前一直会生效。一旦整个集群重启,这些设置就被清除。这些设置可以通过 API 来进行设置。
|
docker 通过插件查看容器启动命名
https://hub.docker.com/r/cucker/get_command_4_run_container
1 2 3 4 5 6 7 8 9
| docker pull cucker/get_command_4_run_container
echo "alias get_run_command='docker run --privileged --rm -v /var/run/docker.sock:/var/run/docker.sock cucker/get_command_4_run_container'" >> ~/.bashrc
. ~/.bashrc
get_run_command <容器NAME>/<容器ID>
|
ES ILM & rollover 索引滚动
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-index-lifecycle.html
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-rollover.html
ILM config
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-settings.html
1 2 3 4 5 6 7 8 9
| indices.lifecycle.poll_interval
PUT _cluster/settings { "transient": { "indices.lifecycle.poll_interval": "1m" } }
|
ILM policy
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/ilm-rollover.html
rollover 滚动api搭配ILM自动进行生命周期管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| PUT _ilm/policy/<policy_name> { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_age": "30d" } } }, "delete": { "min_age": "180d", "actions": { "delete": {} } } } } }
|
index template
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| PUT _template/<template_name> { "index_patterns": [ "<index_name-predfix>-*" ], "settings": { "index": { "lifecycle.name": "<policy_name>", "lifecycle.rollover_alias": "<your_alias>", "number_of_shards": "3" } }, "mappings": { "properties": { ..... } } }
|
init index & set alias
is_write_index:表示当前索引是写入索引
1 2 3 4 5 6 7 8
| PUT /<index_name>-000001 { "aliases": { "<your_alias>": { "is_write_index": true } } }
|
关于 is_write_index 参数在 rollover index api 中有记载,索引滚动有多种场景,详情看文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-rollover-index.html#
我们这里别名对应多个索引,需要指定哪个索引是当前最新写入的,所以初始化时设置 “is_write_index”: true,触发滚动后新索引会被设置为”is_write_index”: true,旧索引值被修改为false
手动执行滚动策略
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-rollover-index.html#
1 2 3 4 5 6
| POST /<your_alias>/_rollover { "conditions": { "max_docs": 50000 } }
|
ELK
filebeat
配置项:https://www.elastic.co/guide/en/beats/filebeat/7.9/configuring-howto-filebeat.html
模板化配置文件:https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-reference-yml.html
filebeat日志多行匹配:https://www.elastic.co/guide/en/beats/filebeat/7.9/multiline-examples.html#multiline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| filebeat.inputs: - type: log paths: - /usr/share/filebeat/logs/mcs-service.log fields: type: mcs multiline.type: pattern multiline.pattern: '^\d{4}-\d{2}-\d{2}' multiline.negate: true multiline.match: after - type: log paths: - /usr/share/filebeat/logs/mes-service.log fields: type: mes multiline.type: pattern multiline.pattern: '^\d{4}-\d{2}-\d{2}' multiline.negate: true multiline.match: after
ignore_older: 2h
close_inactive: 1m
scan_frequency: 10s
output.logstash: hosts: ["128.168.11.101:5044"] processors: - drop_fields: fields: ["agent","ecs","host","input","log"]
|
1 2 3 4 5 6 7 8 9
| docker pull elastic/filebeat:7.9.3
docker run -d \ --name filebeat \ --privileged=true \ -v /u01/soft/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \ -v /u01/soft/logs:/usr/share/filebeat/logs \ -w /usr/share/filebeat \ elastic/filebeat:7.9.3
|
收集器多个日志源处理
https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-input-log.html#filebeat-input-log-fields
自定义fields字段,logstash进行分类判断
logstash
logstash官方文档:https://www.elastic.co/guide/en/logstash/7.9/input-plugins.html
1 2 3
| http.host: "127.0.0.1" http.port: 9600
|
1 2 3 4 5 6 7 8 9 10
| logback: %date %level [%thread] %logger{36} [%file : %line] %msg%n logstash: "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3}) %{LOGLEVEL:level} \[%{DATA:thread}\] %{JAVACLASS:javaClass} \[%{DATA:class}\] %{GREEDYDATA:data}"
logback: %d{yyyy-MM-dd HH:mm:ss.SSS} %ip %level [%thread] [%file : %line] %msg%n logstash: "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) %{IP:ip} %{LOGLEVEL:level} \[%{DATA:thread}\] \[%{DATA:class}\] %{GREEDYDATA:data}"
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| input { beats { port => 5044 } }
filter { if [fields][type] == "mcs" { grok { match => {"message" => "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) %{IP:ip} %{LOGLEVEL:level} \[%{DATA:thread}\] \[%{DATA:class}\] %{GREEDYDATA:data}"} } } else if [fields][type] == "mes" { grok { match => {"message" => "(?<log_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}) %{LOGLEVEL:level} \[%{DATA:thread}\] \[%{DATA:class}\] %{GREEDYDATA:data}"} } } date { match => ["log_time", "yyyy-MM-dd HH:mm:ss.SSS"] target => "@timestamp" timezone => "Asia/Shanghai" } mutate { remove_field => ["@version","tags","message","log_time","thread"] } }
output { if [fields][type] == "mcs" { elasticsearch { hosts => ["http://128.168.11.112:9200"] index => "mcslog-%{+YYYY.MM.dd}" } } else if [fields][type] == "mes" { elasticsearch { hosts => ["http://128.168.11.112:9200"] index => "meslog-%{+YYYY.MM.dd}" } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| # 索引自动删除策略 PUT _ilm/policy/logstash_auto_delete { "policy": { "phases": { "delete": { "min_age": "7d", "actions": { "delete": {} } } } } }
|
索引模板配置:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-templates-v1.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| PUT _template/mcslog { "index_patterns": [ "mcslog-*" ], "settings": { "index": { "lifecycle": { "name": "logstash_auto_delete" }, "number_of_shards": "2" } }, "mappings": { "properties": { "@timestamp": { "type": "date" }, "level": { "type": "text" }, "ip": { "type": "text" }, "class": { "type": "text" }, "data": { "type": "text" } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| docker pull logstash:7.9.3
docker run -d \ --name logstash \ --privileged=true \ -p 9600:9600 \ -p 5044:5044 \ -v /u01/soft/logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml \ -v /u01/soft/logstash/pipeline:/usr/share/logstash/pipeline \ -w /usr/share/logstash \ logstash:7.9.3 -v /u01/soft/logstash/data:/usr/share/logstash/data \
|
kibana
kibana配置:https://www.elastic.co/guide/en/kibana/7.9/settings.html
1 2 3 4 5 6
| server.name: kibana server.host: "0" elasticsearch.hosts: [ "http://128.168.11.112:9200" ] monitoring.ui.container.elasticsearch.enabled: true i18n.locale: zh-CN
|
1 2 3 4 5 6 7 8 9
| docker pull kibana:7.9.3
docker run -d \ --name kibana \ --privileged=true \ -p 5601:5601 \ -v /u01/soft/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \ -w /usr/share/kibana \ kibana:7.9.3
|
1 2
| dateFormat YYYY-MM-DD HH:mm:ss.SSS dateFormat:tz Asia/Shanghai
|
logback切割日志,filebeat收集日志是否会冲突?
logback切割日志,将当前log文件变成我们自定义的zip,并生成新的log,此时filebeat收集日志会不会受到影响?
https://www.jianshu.com/p/e98287437d41
可参考官方文档:https://www.elastic.co/guide/en/beats/filebeat/7.9/filebeat-input-docker.html#filebeat-input-docker-close-inactive
close_inactive、scan_frequency等相关配置
简单说filebeat有刷新机制,不会长期监听一个没有新增内容的文件
镜像导入导出
1 2 3
| docker save -o <镜像存储输出路径,保存为tar> <镜像id or 镜像名:版本号> 上传文件进行宿主机导入 docker load -i <tar包上传路径>
|