有时我们需要将 ES
迁移到新物理机上或容器云上,迁移方法千千万,目前在实际迁移中尝试使用 elasticsearch-dump 和 logstash ,基本可覆盖 90%
以上的迁移场景。
为了方便创建与销毁,整体示例我们采用容器方式来运行,示例基于 ES 5.6.10
版本。
ElasticsearchDump
elasticsearch-dump
适合以下场景:
- 纯追加式数据 : 没有数据更新的需求,比如日志、统计数据、区块链历史数据等。
- 可停机 : 停机指的是停止向老
ES
中写入,一般是对 实时数据/新数据 要求较低。
elasticsearch-dump
使用方式比较简单,不需要什么额外的配置,拿来即用。
镜像构建
主要是构建好 elasticsearch-dump
运行的环境,如果嫌麻烦或者本地已有 Node.js
环境,也可以直接 npm install -g elasticdump@latest
FROM node:14-buster-slim
RUN apt-get update
RUN apt-get install -y iputils-ping
RUN apt-get install -y curl
ENV NODE_ENV production
RUN npm install -g elasticdump@latest
CMD ["/bin/bash", "-c", "'while true; do sleep 1; done'"]
直接 docker build --platform linux/amd64 --tag elasticdump:latest .
构建镜像。
程序运行
镜像构建好之后,直接 docker run --name migration-elasticdump -d elasticdump:latest
运行镜像。
镜像启动后,直接 docker exec -t migration-elasticdump elasticdump --input=${SOURCE} --output=${TARGET}
,或者 docker exec -it migration-elasticdump /bin/bash
进入到容器内。
迁移命令示例如下:
# 迁移索引结构
elasticdump --input=${SOURCE}/${index} --output=${TARGET}/${index} --type=mapping
# 迁移索引数据
elasticdump --input=${SOURCE}/${index} --output=${TARGET}/${index} --type=data
# 比如我们从 HOST1 中迁移数据到 HOST2,迁移的是 users 索引。
# 首先迁移 users 索引结构
elasticdump --input=HOST1/users --output=HOST2/users --type=mapping
# 然后迁移 users 索引数据
elasticdump --input=HOST1/users --output=HOST2/users --type=data
# 迁移 users 索引数据默认是一次迁移 100 条,可以设置 limit 来提高单次的迁移数量。
# --limit 1000 代表一次迁移 1000 条数据。
elasticdump --input=HOST1/users --output=HOST2/users --type=data --limit 1000
迁移
整体的迁移过程其实很简单。
- 终止原
ES
写入。 - 使用
elasticdump
进行同步,并测试数据无问题。 ES
写入目标和服务读取目标都更改为新ES
。
更多的迁移命令可以参考 elasticsearch-dump 上的,需要注意以下几点:
elasticdump
会直接将原数据原封不动的迁移过来,包含_id
,所以不需要担心重复数据。elasticdump
需要先迁移mapping
再迁移data
,默认是只迁移data
。如果存在分片、模版等,需要先迁移相关的设置或模版,具体可参考 elasticsearch-dump 上。elasticdump
与Elasticsearch
版本之间没有兼容性,并且新旧ES
集群版本可以不一致或差异较大,但是需要Elasticsearch 1.0.0
以上。elasticdump
会自动创建索引,所以不需要关注索引的创建。elasticdump
本地运行需要Node.js 10.0.0
及以上才可。
更详细的使用方式可以参考 A practical guide to working with Elasticdump 或 Guide to Elasticdump - Moving and Saving Elasticsearch Indices。
Logstash
Logstash
适合以下场景:
- 更新式数据 : 有数据更新的需求,比如用户数据、商品数据、文章数据等。
- 不停机 : 停机指的是停止向老 ES 中写入,一般是对 实时数据/新数据 要求较高。
镜像选择
Logstash
是 ES
官方给出的一个数据同步/传输方案。在使用 Logstash
时需要根据 ES
版本选择相应的版本。可以参考 版本对应关系 。
配置
Logstash
一般有三部分需要配置:
jvm
: 主要配置heap
内存。logstash
: 主要配置logstash
的运行时参数。pipeline
: 主要配置数据管道的input
端和output
端。
JVM
就是正常的 jvm
配置项,一般情况下不需要调整,大数据量情况下可能需要调整下。
示例配置:
-Xmx16g
-Xms16g
Logstash
logstash
的运行时参数,整体配置可以参考 settings-file ,主要需要对 pipeline 进行配置下以提高写入性能。
pipeline.workers: 1000 # 工作进程数量,默认值为 1,建议设置为 cpu 核数一致。
pipeline.output.workers: 1000
pipeline.batch.size: 1000 # 每批次最大大小。
pipeline.batch.delay: 3 # 等待时间,毫秒。
Pipeline
pipeline
的配置要复杂一些,整体分为三方面配置:
input
: 数据源,也叫数据输入方,是必选的。filter
: 中间过滤层,主要是把input
数据进行二次加工,是可选的。output
: 输出目标,也叫数据输出方,是必选的。
Logstash
的整体流程就是从 input
方拉取数据,在 filter
这里做下处理,然后输出给 output
方。
input
input
需要根据来源选择对应的插件,因为我们是 ES
迁移,所以需要使用 ES
插件。
具体配置如下:
input {
elasticsearch {
index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的,支持通配符 *。
hosts => [${ESHost}] # 原 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
size => 1000 # 每次拉取的最大条数。
query => '{"query":{"range":{"createdAt":{"gte":"2022-10-21 00:00:00","lte":"2022-10-24 00:00:00"}}}}' # query 是筛选范围,就是正常的 es 查询语句。全量迁移的时候不建议选择,对于增量迁移可以设置。
schedule => "* * * * *" # schedule 是任务执行间隔,格式和 corntab 一样,通常用在增量迁移上。
docinfo => true # 拉取内容包含文档信息,即 _index、_id、_type。
docinfo_fields => ["_index", "_id", "_type"] # 要传输的文档信息字段,与 docinfo 配合使用。为了避免数据重复,记得配置 _id。
}
}
除此之外,也可以根据 文档 或自己的需求自行配置。
filter
filter 根据你的需求,可以选择不同的处理方式。
具体配置如下:
filter {
mutate { # mutate 可以用来删除、替换、重命名字段。
remove_field => ["@timestamp", "@version"] # 主要是用来删除自动生成的 @timestamp 和 @version。
}
}
除此之外,也可以根据 文档 或自己的需求自行配置。
output
output 同样需要根据来源选择对应的插件,因为我们是 ES 迁移,所以同样需要使用 ES 插件。
具体配置如下:
output {
elasticsearch {
hosts => [${ESHost1},${ESHost2},${ESHost3}] # 新 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的。如果 index 是设置了通配符,这里可以设置为 "%{[@metadata][_index]}" 来区分多个 index。
document_type => "%{[@metadata][_type]}" # 索引类型,直接去拿 input 那边传过来的。
document_id => "%{[@metadata][_id]}" # 数据 id,也是直接去拿 input 那边传过来的。
}
stdout { codec => rubydebug { metadata => true } } # 这个是一个可选的标准输出插件,主要是用来调试用的 https://www.elastic.co/guide/en/logstash/5.6/plugins-outputs-stdout.html。
}
除此之外,也可以根据 文档 或自己的需求自行配置。
完整配置
完整配置如下所示:
input {
elasticsearch {
index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的,支持通配符 *。
hosts => [${ESHost}] # 原 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
size => 1000 # 每次拉取的最大条数。
query => '{"query":{"range":{"createdAt":{"gte":"2022-10-21 00:00:00","lte":"2022-10-24 00:00:00"}}}}' # query 是筛选范围,就是正常的 es 查询语句。全量迁移的时候不建议选择,对于增量迁移可以设置。
schedule => "* * * * *" # schedule 是任务执行间隔,格式和 corntab 一样,通常用在增量迁移上。
docinfo => true # 拉取内容包含文档信息,即 _index、_id、_type。
docinfo_fields => ["_index", "_id", "_type"] # 要传输的文档信息字段,与 docinfo 配合使用。为了避免数据重复,记得配置 _id。
}
}
filter {
mutate { # mutate 可以用来删除、替换、重命名字段。
remove_field => ["@timestamp", "@version"] # 主要是用来删除自动生成的 @timestamp 和 @version。
}
}
output {
elasticsearch {
hosts => [${ESHost1},${ESHost2},${ESHost3}] # 新 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的。如果 index 是设置了通配符,这里可以设置为 "%{[@metadata][_index]}" 来区分多个 index。
document_type => "%{[@metadata][_type]}" # 索引类型,直接去拿 input 那边传过来的。
document_id => "%{[@metadata][_id]}" # 数据 id,也是直接去拿 input 那边传过来的。
}
stdout { codec => rubydebug { metadata => true } } # 这个是一个可选的标准输出插件,主要是用来调试用的 https://www.elastic.co/guide/en/logstash/5.6/plugins-outputs-stdout.html。
}
在配置时需要注意以下几点:
input
的size
需要根据ES
的实际线程数来定,如果超出就很容易出现429
被拒绝写入。input
不配置docinfo
和docinfo_fields
的话,是不会携带id
和type
的,写入到output
时会自动生成id
,可能会导致会写入重复数据。- 如果想要批量同步一批索引或全部索引,可以把
input
的index
设置为*
或者xxx*
,匹配规则与ES
一致。然后把output
的index
设置为%{[@metadata][_index]}
。 - 多数据源的重复数据处理可以参考 fingerprint filters,主要原理是对数据进行
HASH
来去重。
程序运行
配置文件 ok 之后就可以正式运行迁移服务了,文件放置可以参考 配置文件存放路径。
容器内可以将配置文件放置在以下目录:
jvm
配置:/usr/share/logstash/config/jvm.options
logstash
运行配置:/usr/share/logstash/config/logstash.yml
pipline
迁移管道配置:/usr/share/logstash/config/logstash.conf
之后直接指定容器启动命令: logstash -f /usr/share/logstash/config/logstash.conf --path.settings /usr/share/logstash/config
迁移
抛去配置的过程外,整体的迁移过程也是比较简单。我们简单说下时序型的迁移流程。
- 设置存量同步的时间范围,根据不同颗粒度来迁移前一天或前一小时之前的数据。比如
2023-01-02 10:00:00
进行的迁移,那就迁移2023-01-02 09:00:00
之前的存量数据。 - 设置增量同步任务,根据上一步设置的存量同步时间范围,来定时迁移之后的增量数据。比如
2023-01-02 10:00:00
进行的迁移,存量数据迁移2023-01-02 09:00:00
之前的,那增量数据迁移2023-01-02 09:00:00
及以后的。根据数量多寡,再决定增量迁移任务执行频率。 - 同步完毕后,并测试数据无问题。
ES
写入目标和读取目标更改为新ES
,关闭Logstash
。
Other
- 在迁移完成后,可以将程序设置为双写(同时写入新
ES
和老 ES,ES
在新ES
出现问题时可以直接切换回去 )。 - 不论是
elasticsearch-dump
还是Logstash
,使用它们是与数据量大小是没有关系的,两者都能同步大量数据,elasticsearch-dump
也可以实现断点传输,主要看你是否是能停机以及是否是对实时性比较敏感的数据。