# ES 数据迁移

背景:业务数据量很大,数据分布在各个 dc 下面,每个 dc 对应自己的 es cluster,现需要采集每个 dc 下 es cluster 中指定数据,这里指定是指采集特定 index 下指定的 metrics,需要通过 lucene 配合 DSL 筛选数据,现考虑两种方式。

# 方式一:使用后台 job + 脚本定时获取

基于我们已有的 job 集群环境,创建 Job 执行脚本(如 python 脚本),通过 API 方式采集各个 es 端数据源,并将这些数据存入我们的 es,这种方式可以同时将数据写一份到 oracle,但是因为数据量比较大,需要定期清理 oracle 表数据,而且如果清理时选择 DML 也就是 delete 操作的话并不能使 oracle 水位线降低,所以 delete 大表数据会影响后续表查询(扫描)性能,还可能要进行表重建,这也很麻烦,而且因为这个数据主要用来在 grafana 展示,写入 oracle 大概率只能说是备份近期数据,没有太多作用,所以可以省略该环节,并且之前提到过 grafana oracle 插件付费,grafana 无法直接对接 oracle 数据源也是一个因素。

附 python job 脚本

from elasticsearch import Elasticsearch, helpers
import pandas as pd
def es_migrate():
    client = Elasticsearch(hosts='https://xxx.xxx.com/esapi', http_auth=('username', 'password'))
    # 下面 DSL 可以从 kibana discover Inspect 打开复制过来,时间作为参数传进来,所以需要 job 每 run 一次记一次时间,然后下一次运行只需获取 last run time 到当前时间的
    query = {
        "query": {
            "bool": {
                "must": [
                    {
                        "query_string": {
                            "query": "metrics.appName:dajiangyou",
                            "analyze_wildcard": True,
                            "time_zone": "GMT+0"
                        }
                    }
                ],
                "filter": [
                    {
                        "range": {
                            "@timestamp": {
                                "gte": "2022-03-15T02:02:43.748Z",
                                "lte": "2022-03-15T04:02:43.748Z",
                                "format": "strict_date_optional_time"
                            }
                        }
                    }
                ],
                "should": [],
                "must_not": []
            }
        }
    }
    # client.search(index=metric['esindex'], body=dsl, request_timeout=300)
    # helpers.scan 返回一个 python 生成器 generator
    resp = helpers.scan(
        client,
        query=query,
        index=es_index_name,
        size=10000,
    )
    data = []  # 数据写入内存记录
    for num, doc in enumerate(resp):
        print('\n', num, '', doc)
        data.append(doc)
    # 转 pandas 的 DataFrame 格式,方便使用 to_sql 以 append 方式追加至 oracle 表
    df = pd.DataFrame(data)
    d_type = util.set_d_type_dict(data)  # 指定类型,优化存储性能,提速
    df.to_sql('table_name', engine, if_exists='append', index=False, dtype=d_type)
    
    es_host_list = ES_HOSTS.split(',')
    my_es = Elasticsearch(hosts=es_host_list, http_auth=(ES_USERNAME, ES_PASSWORD))
    documents = [{"_index": metric['storedtable'],
                  "_source": doc,
                  '_id': f"{doc['createtime']}_{doc['env']}_{doc['dc']}"}   # 下次插入同样 id 时数据覆盖
                 for doc in data]
    bulk(my_es, documents)

https://elasticsearch-py.readthedocs.io/en/v8.1.0/
https://kb.objectrocket.com/elasticsearch/how-to-use-python-helpers-to-bulk-load-data-into-an-elasticsearch-index
https://kb.objectrocket.com/elasticsearch/elasticsearch-and-scroll-in-python-953
https://elasticsearch-py.readthedocs.io/en/7.x/helpers.html#scan

# 方式二:logstash

通过在 /etc/logstash/conf.d/ 目录下添加 logstash 配置文件,自动拉去 es 数据源 output 到我们 es 集群。

input{
    elasticsearch {
        hosts => "https://xxx-xxx.xxx.com/esapi"
        index => "metrics-xxx-*"
        user => 'es_username'
        password => '************'
        query => '{ "query": { "bool": { "must": [{ "query_string": {"query": "metrics.xxx:xxxx OR metrics.xxx:xxxx"}  }]  }  } }'
        size => 500
        scroll => "5m"
        docinfo => true
        docinfo_target => "[@metadata][doc]"
        schedule => "*/5 * * * *"
    }
}


output{
    elasticsearch{
        hosts => ["xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200", "xx.xx.xx.xx:9200"]
        user => "our_es_username"
        password => "*************"
        index => "my_es_index_name"
        document_id => "%{[@metadata][doc][_id]}"
    }
}

# 总结

当前感觉 logstash 采集比较方便,所以可以选择方式二,如果需要数据录入 db,可以选择方式一,当然还有其他办法,比如通过 apache airflow 或者其他一些第三方开源项目都可以,但是第二种对于数据处理没有走程序灵活,而且通常我们从 es 里拿的都是聚合处理后的数据,所以两者各有优缺点

更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

Jalen Chu 微信支付

微信支付

Jalen Chu 支付宝

支付宝

Jalen Chu 公众号

公众号