文章目录
- 前言
- 一、原始实现
- 二、总觉得哪里不对劲?
- 三、 意想不到的收获
- 总结
前言
系统引入 Elasticsearch 来支持更高效的查询场景 (以下简称 ES). 就需要将数据库数据同步到 ES 中. 方案选择是分批从 DB 中读取数据再分批写入ES 中.
一、原始实现
读取:for(inti=0;i<times;i++){List<Order>saleList=salesDataGateway.batchList(query);writeEsKit.doWrite(saleList);query.setStartIndex(maxid+1);}publicclassOrderimplementsSerializable{@JsonProperty("geo_cd")privateStringgeoCd;@JsonProperty("fiscal_year")privateStringfiscalYear;}写入ES:BulkRequestbulk=newBulkRequest();bulk.timeout(timeOut);for(Orderitem:saleList){bulk.add(newIndexRequest(indexName).id(item.getOrderId()).source(JsonUtil.object2Json(item),XContentType.JSON));}Client.bulk(bulk);这段代码平稳运行了一年多,一张 index 最多要写入500多万数据,耗时约2小时.因为一条订单260多个字段,所以大家也都觉得就得这么长时间.汇报得时候也理直气壮.二、总觉得哪里不对劲?
总觉得这里怪怪的.代码逻辑先从数据库查出数据.又使用 @JsonProperty 来命名转换.最后格式化为 JSON 写入.而格式化这个操作在处理大实体类时是有性能损耗的.所以我想是不是可以有别的方式.比如去掉中间商赚差价....for(inti=0;i<times;i++){List<HashMap<String,Object>>saleList=salesDataGateway.batchListMap(query);writeEsKit.doWrite(saleList);query.setStartIndex(maxid+1);}BulkRequestbulk=newBulkRequest();bulk.timeout(timeOut);for(HashMap<String,Object>item:saleList){bulk.add(newIndexRequest(indexName).id(item.getOrderId()).source(item));}三、 意想不到的收获
ES 支持Map 写入.所以拿的就是Map,写入同样是Map.不兜圈子直接搞.写入耗时从2小时直接缩短到29分钟.觉得它耗时?没想到它这么耗时!!总结
为领导跟老板汇报提供了素材.