ES 迁移实战

ES 迁移实战

Cocytus Elias 54 2023-02-28

有时我们需要将 ES 迁移到新物理机上或容器云上,迁移方法千千万,目前在实际迁移中尝试使用 elasticsearch-dumplogstash ,基本可覆盖 90% 以上的迁移场景。

为了方便创建与销毁,整体示例我们采用容器方式来运行,示例基于 ES 5.6.10 版本。

ElasticsearchDump

elasticsearch-dump 适合以下场景:

  • 纯追加式数据 : 没有数据更新的需求,比如日志、统计数据、区块链历史数据等。
  • 可停机 : 停机指的是停止向老 ES 中写入,一般是对 实时数据/新数据 要求较低。

elasticsearch-dump 使用方式比较简单,不需要什么额外的配置,拿来即用。

镜像构建

主要是构建好 elasticsearch-dump 运行的环境,如果嫌麻烦或者本地已有 Node.js 环境,也可以直接 npm install -g elasticdump@latest

FROM node:14-buster-slim

RUN apt-get update
RUN apt-get install -y iputils-ping
RUN apt-get install -y curl

ENV NODE_ENV production

RUN npm install -g elasticdump@latest

CMD ["/bin/bash", "-c", "'while true; do sleep 1; done'"]

直接 docker build --platform linux/amd64 --tag elasticdump:latest . 构建镜像。

程序运行

镜像构建好之后,直接 docker run --name migration-elasticdump -d elasticdump:latest 运行镜像。

镜像启动后,直接 docker exec -t migration-elasticdump elasticdump --input=${SOURCE} --output=${TARGET} ,或者 docker exec -it migration-elasticdump /bin/bash 进入到容器内。

迁移命令示例如下:

# 迁移索引结构
elasticdump --input=${SOURCE}/${index} --output=${TARGET}/${index} --type=mapping
# 迁移索引数据
elasticdump --input=${SOURCE}/${index} --output=${TARGET}/${index} --type=data

# 比如我们从 HOST1 中迁移数据到 HOST2,迁移的是 users 索引。
# 首先迁移 users 索引结构
elasticdump --input=HOST1/users --output=HOST2/users --type=mapping
# 然后迁移 users 索引数据
elasticdump --input=HOST1/users --output=HOST2/users --type=data

# 迁移 users 索引数据默认是一次迁移 100 条,可以设置 limit 来提高单次的迁移数量。
# --limit 1000 代表一次迁移 1000 条数据。
elasticdump --input=HOST1/users --output=HOST2/users --type=data --limit 1000 

迁移

整体的迁移过程其实很简单。

  1. 终止原 ES 写入。
  2. 使用 elasticdump 进行同步,并测试数据无问题。
  3. ES 写入目标和服务读取目标都更改为新 ES

更多的迁移命令可以参考 elasticsearch-dump 上的,需要注意以下几点:

  1. elasticdump 会直接将原数据原封不动的迁移过来,包含 _id,所以不需要担心重复数据。
  2. elasticdump 需要先迁移 mapping 再迁移 data,默认是只迁移 data。如果存在分片、模版等,需要先迁移相关的设置或模版,具体可参考 elasticsearch-dump 上。
  3. elasticdumpElasticsearch 版本之间没有兼容性,并且新旧 ES 集群版本可以不一致或差异较大,但是需要 Elasticsearch 1.0.0 以上。
  4. elasticdump 会自动创建索引,所以不需要关注索引的创建。
  5. elasticdump 本地运行需要 Node.js 10.0.0 及以上才可。

更详细的使用方式可以参考 A practical guide to working with ElasticdumpGuide to Elasticdump - Moving and Saving Elasticsearch Indices

Logstash

Logstash 适合以下场景:

  • 更新式数据 : 有数据更新的需求,比如用户数据、商品数据、文章数据等。
  • 不停机 : 停机指的是停止向老 ES 中写入,一般是对 实时数据/新数据 要求较高。

镜像选择

LogstashES 官方给出的一个数据同步/传输方案。在使用 Logstash 时需要根据 ES 版本选择相应的版本。可以参考 版本对应关系

配置

Logstash 一般有三部分需要配置:

  • jvm: 主要配置 heap 内存。
  • logstash: 主要配置 logstash 的运行时参数。
  • pipeline: 主要配置数据管道的 input 端和 output 端。

JVM

就是正常的 jvm 配置项,一般情况下不需要调整,大数据量情况下可能需要调整下。

示例配置:

-Xmx16g
-Xms16g

Logstash

logstash 的运行时参数,整体配置可以参考 settings-file ,主要需要对 pipeline 进行配置下以提高写入性能。

 pipeline.workers: 1000  # 工作进程数量,默认值为 1,建议设置为 cpu 核数一致。
pipeline.output.workers: 1000 
pipeline.batch.size: 1000 # 每批次最大大小。
pipeline.batch.delay: 3 # 等待时间,毫秒。 

Pipeline

pipeline 的配置要复杂一些,整体分为三方面配置:

  • input: 数据源,也叫数据输入方,是必选的。
  • filter: 中间过滤层,主要是把 input 数据进行二次加工,是可选的。
  • output: 输出目标,也叫数据输出方,是必选的。

Logstash 的整体流程就是从 input 方拉取数据,在 filter 这里做下处理,然后输出给 output 方。

input

input 需要根据来源选择对应的插件,因为我们是 ES 迁移,所以需要使用 ES 插件。

具体配置如下:

input {
    elasticsearch {
        index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的,支持通配符 *。
        hosts => [${ESHost}] # 原 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
        size => 1000 # 每次拉取的最大条数。
        query => '{"query":{"range":{"createdAt":{"gte":"2022-10-21 00:00:00","lte":"2022-10-24 00:00:00"}}}}' # query 是筛选范围,就是正常的 es 查询语句。全量迁移的时候不建议选择,对于增量迁移可以设置。
        schedule => "* * * * *" # schedule 是任务执行间隔,格式和 corntab 一样,通常用在增量迁移上。
        docinfo => true # 拉取内容包含文档信息,即 _index、_id、_type。
        docinfo_fields => ["_index", "_id", "_type"] # 要传输的文档信息字段,与 docinfo 配合使用。为了避免数据重复,记得配置 _id。
    }
}

除此之外,也可以根据 文档 或自己的需求自行配置。

filter

filter 根据你的需求,可以选择不同的处理方式。

具体配置如下:

filter {
    mutate { # mutate 可以用来删除、替换、重命名字段。
        remove_field => ["@timestamp", "@version"] # 主要是用来删除自动生成的 @timestamp 和 @version。
    }
}

除此之外,也可以根据 文档 或自己的需求自行配置。

output

output 同样需要根据来源选择对应的插件,因为我们是 ES 迁移,所以同样需要使用 ES 插件。

具体配置如下:

output {
    elasticsearch {
        hosts => [${ESHost1},${ESHost2},${ESHost3}] # 新 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
        index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的。如果 index 是设置了通配符,这里可以设置为 "%{[@metadata][_index]}" 来区分多个 index。
        document_type => "%{[@metadata][_type]}" # 索引类型,直接去拿 input 那边传过来的。
        document_id => "%{[@metadata][_id]}" # 数据 id,也是直接去拿 input 那边传过来的。
    }
    stdout { codec => rubydebug { metadata => true } } # 这个是一个可选的标准输出插件,主要是用来调试用的 https://www.elastic.co/guide/en/logstash/5.6/plugins-outputs-stdout.html。
}

除此之外,也可以根据 文档 或自己的需求自行配置。

完整配置

完整配置如下所示:

input {
    elasticsearch {
        index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的,支持通配符 *。
        hosts => [${ESHost}] # 原 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
        size => 1000 # 每次拉取的最大条数。
        query => '{"query":{"range":{"createdAt":{"gte":"2022-10-21 00:00:00","lte":"2022-10-24 00:00:00"}}}}' # query 是筛选范围,就是正常的 es 查询语句。全量迁移的时候不建议选择,对于增量迁移可以设置。
        schedule => "* * * * *" # schedule 是任务执行间隔,格式和 corntab 一样,通常用在增量迁移上。
        docinfo => true # 拉取内容包含文档信息,即 _index、_id、_type。
        docinfo_fields => ["_index", "_id", "_type"] # 要传输的文档信息字段,与 docinfo 配合使用。为了避免数据重复,记得配置 _id。
    }
}

filter {
    mutate { # mutate 可以用来删除、替换、重命名字段。
        remove_field => ["@timestamp", "@version"] # 主要是用来删除自动生成的 @timestamp 和 @version。
    }
}

output {
    elasticsearch {
        hosts => [${ESHost1},${ESHost2},${ESHost3}] # 新 ES 服务,可以设置一个或多个节点的地址,但是这些节点必须属于同一个集群,使用时记得把 ${ESHost} 更换为自己的 ES 地址。
        index => "${index}" # 索引名称,记得把 ${index} 更换为你自己的。如果 index 是设置了通配符,这里可以设置为 "%{[@metadata][_index]}" 来区分多个 index。
        document_type => "%{[@metadata][_type]}" # 索引类型,直接去拿 input 那边传过来的。
        document_id => "%{[@metadata][_id]}" # 数据 id,也是直接去拿 input 那边传过来的。
    }
    stdout { codec => rubydebug { metadata => true } } # 这个是一个可选的标准输出插件,主要是用来调试用的 https://www.elastic.co/guide/en/logstash/5.6/plugins-outputs-stdout.html。
}

在配置时需要注意以下几点:

  1. inputsize 需要根据 ES 的实际线程数来定,如果超出就很容易出现 429 被拒绝写入。
  2. input 不配置 docinfodocinfo_fields 的话,是不会携带 idtype 的,写入到 output 时会自动生成 id,可能会导致会写入重复数据。
  3. 如果想要批量同步一批索引或全部索引,可以把 inputindex 设置为 * 或者 xxx*,匹配规则与 ES 一致。然后把 outputindex 设置为 %{[@metadata][_index]}
  4. 多数据源的重复数据处理可以参考 fingerprint filters,主要原理是对数据进行 HASH 来去重。

程序运行

配置文件 ok 之后就可以正式运行迁移服务了,文件放置可以参考 配置文件存放路径

容器内可以将配置文件放置在以下目录:

  • jvm 配置: /usr/share/logstash/config/jvm.options
  • logstash 运行配置: /usr/share/logstash/config/logstash.yml
  • pipline 迁移管道配置: /usr/share/logstash/config/logstash.conf

之后直接指定容器启动命令: logstash -f /usr/share/logstash/config/logstash.conf --path.settings /usr/share/logstash/config

迁移

抛去配置的过程外,整体的迁移过程也是比较简单。我们简单说下时序型的迁移流程。

  1. 设置存量同步的时间范围,根据不同颗粒度来迁移前一天或前一小时之前的数据。比如 2023-01-02 10:00:00 进行的迁移,那就迁移 2023-01-02 09:00:00 之前的存量数据。
  2. 设置增量同步任务,根据上一步设置的存量同步时间范围,来定时迁移之后的增量数据。比如 2023-01-02 10:00:00 进行的迁移,存量数据迁移 2023-01-02 09:00:00 之前的,那增量数据迁移 2023-01-02 09:00:00 及以后的。根据数量多寡,再决定增量迁移任务执行频率。
  3. 同步完毕后,并测试数据无问题。ES 写入目标和读取目标更改为新 ES ,关闭 Logstash

Other

  • 在迁移完成后,可以将程序设置为双写(同时写入新 ES 和老 ES,ES在新 ES 出现问题时可以直接切换回去 )。
  • 不论是 elasticsearch-dump 还是 Logstash ,使用它们是与数据量大小是没有关系的,两者都能同步大量数据, elasticsearch-dump 也可以实现断点传输,主要看你是否是能停机以及是否是对实时性比较敏感的数据。