java操作ElasticSearch:批量操作
pom文件
-
# 版本号
-
<version.elasticsearch-rest-high-level-client>6.5.0</version.elasticsearch-rest-high-level-client>
-
-
<dependency>
-
<groupId>org.elasticsearch.client</groupId>
-
<artifactId>elasticsearch-rest-high-level-client</artifactId>
-
<version>${version.elasticsearch-rest-high-level-client}</version>
-
</dependency>
-
<dependency>
-
<groupId>org.elasticsearch.client</groupId>
-
<artifactId>elasticsearch-rest-client</artifactId>
-
<version>${version.elasticsearch-rest-high-level-client}</version>
-
</dependency>
-
<dependency>
-
<groupId>org.elasticsearch</groupId>
-
<artifactId>elasticsearch</artifactId>
-
<version>${version.elasticsearch-rest-high-level-client}</version>
-
</dependency>
RestHighLevelClient(操作ES的客户端,使用时注入bean即可)
-
public RestHighLevelClient restHighLevelClient() {
-
-
RestClientBuilder rclientBuilder = RestClient.builder(new HttpHost(host, port, "http"))
-
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
-
-
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
-
return httpClientBuilder
-
.setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
-
}
-
});
-
return new RestHighLevelClient(rclientBuilder);
DeleteRequest批量删除
-
/**
-
* @Description: 根据es主键删除数据
-
* @Params:
-
* @Return:
-
* @Author: Mr.myq
-
* @Date: 2022/12/2011:04
-
*/
-
-
public void deleteAllByIds(List<String> ids) {
-
if (!CollectionUtils.isEmpty(ids)) {
-
try {
-
List<List<String>> outherList = SubListUtil.splitList(ids, count);
-
for (List<String> innerList : outherList) {
-
//批量删除数据
-
BulkRequest request = new BulkRequest();
-
request.timeout("60s");
-
for (String id : innerList) {
-
// 类型json/_doc
-
DeleteRequest source = new DeleteRequest().index("索引").id(id).type(”_doc“);
-
request.add(source);
-
}
-
BulkResponse response = restHighLevelClient().bulk(request, RequestOptions.DEFAULT);
-
log.debug("ES批量删除数据 是否有失败 : " response.hasFailures());
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
log.error("ES批量删除数据: " e.toString());
-
throw new RuntimeException("ES批量删除数据: {}", e);
-
} finally {
-
ids.clear();
-
}
-
}
-
}
BulkRequest批量新增
-
/**
-
* 批量更新es数据
-
*
-
* @param esDateList
-
*/
-
private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
-
if (CollectionUtils.isEmpty(esDateList)) {
-
return;
-
}
-
BulkRequest bulkRequest = new BulkRequest();
-
bulkRequest.timeout("200s");
-
try {
-
for (int i = 0; i < esDateList.size(); i ) {
-
CmsGoodsEntity object = esDateList.get(i);
-
Map<String, Object> param = new LinkedHashMap<>();
-
String goodid = object.getGoodid();
-
param.put("tariffs_rate", object.getTariffsRate());
-
param.put("inspection_charges_fee", object.getInspectionChargesFee());
-
param.put("rates", object.getRates());
-
UpdateRequest updateRequest = new UpdateRequest(”索引“, "类型", goodid);
-
updateRequest.doc(param);
-
bulkRequest.add(updateRequest);
-
}
-
// 操作ES
-
BulkResponse bulk = restHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
-
log.info("批量更新ES 是否有失败 {} ", bulk.hasFailures());
-
} catch (IOException e) {
-
e.printStackTrace();
-
throw new RuntimeException("***********批量更新ES数据异常***********");
-
} finally {
-
esDateList.clear();
-
}
-
}
DeleteByQueryRequest根据查询条件删除所有ES数据
-
/**
-
* @Description: 根据查看条件删除
-
* @Params:
-
* @Return:
-
* @Author: Mr.myq
-
* @Date: 2023/2/1415:25
-
*/
-
-
public void deleteBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source) {
-
//通过QueryBuilders中的搜索逻辑
-
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
-
//1 设置条件
-
//设置删除条件: key = value
-
TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
-
if (!StringUtils.isEmpty(hisp)) {
-
TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
-
queryBuilder.must(hispTermQueryBuilder);
-
}
-
TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
-
queryBuilder.must(sourceTermQueryBuilder);
-
queryBuilder.must(supplierIdTermQueryBuilder);
-
-
//2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
-
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
-
deleteByQueryRequest.setTimeout("6000s");
-
deleteByQueryRequest.setQuery(queryBuilder);
-
//指定删除索引
-
deleteByQueryRequest.indices(restClientConfig.getIndex());
-
deleteByQueryRequest.setConflicts("proceed");
-
try {
-
//3 通过deleteByQuery来发起删除请求
-
BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
-
if (deleteResponse.getDeleted() >= 1) {
-
log.info("deleteData,删除成功,删除文档条数: " deleteResponse.getDeleted() " ,indexName:" restClientConfig.getIndex());
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
log.error("无法连接到ES目标服务器");
-
throw new TargetServerException("无法连接到ES目标服务器");
-
}
-
}
UpdateByQueryRequest批量更新
-
/**
-
* @Description: 根据查看条件更新
-
* @Params:
-
* @Return:
-
* @Author: Mr.myq
-
* @Date: 2023/2/1415:25
-
*/
-
public void updateBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source,String value) {
-
//通过QueryBuilders中的搜索逻辑
-
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
-
//1 设置条件
-
//设置删除条件: key = value
-
TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
-
if (!StringUtils.isEmpty(hisp)) {
-
TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
-
queryBuilder.must(hispTermQueryBuilder);
-
}
-
TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
-
queryBuilder.must(sourceTermQueryBuilder);
-
queryBuilder.must(supplierIdTermQueryBuilder);
-
-
//2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
-
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
-
updateByQueryRequest.setTimeout("6000s");
-
updateByQueryRequest.setQuery(queryBuilder);
-
-
// 这个实际上就是对应给脚本中传参数的对象。
-
HashMap<String, Object> params = new HashMap<>(16);
-
params.put("hobby", "修改的参数value");
-
final Script script = new Script(
-
ScriptType.INLINE, "painless",
-
"ctx._source.hobby = params.hobby",
-
params);
-
updateByQueryRequest.setScript(script);
-
//指定索引
-
updateByQueryRequest.indices(restClientConfig.getIndex());
-
updateByQueryRequest.setConflicts("proceed");
-
try {
-
//3 通过deleteByQuery来发起删除请求
-
BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
-
if (deleteResponse.getUpdated() >= 1) {
-
log.info("更新成功,更新文档条数: " deleteResponse.getUpdated() " ,indexName:" restClientConfig.getIndex());
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
log.error("无法连接到ES目标服务器");
-
throw new TargetServerException("无法连接到ES目标服务器");
-
}
-
}
BulkRequest批量删除
-
-
/**
-
* 批量更新es数据
-
*
-
* @param esDateList
-
*/
-
private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
-
if (CollectionUtils.isEmpty(esDateList)) {
-
return;
-
}
-
BulkRequest bulkRequest = new BulkRequest();
-
bulkRequest.timeout("600s");
-
RestHighLevelClient restHighLevelClient = restClientConfig.restHighLevelClient();
-
try {
-
for (int i = 0; i < esDateList.size(); i ) {
-
CmsGoodsEntity object = esDateList.get(i);
-
Map<String, Object> param = new LinkedHashMap<>();
-
String goodid = object.getGoodid();
-
param.put("tariffs_rate", object.getTariffsRate());
-
param.put("inspection_charges_fee", object.getInspectionChargesFee());
-
param.put("rates", object.getRates());
-
UpdateRequest updateRequest = new UpdateRequest(restClientConfig.getIndex(), "_doc", goodid);
-
updateRequest.doc(param);
-
bulkRequest.add(updateRequest);
-
}
-
// 操作ES
-
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
-
log.warn("批量更新ES 是否有失败 {} ", bulk.hasFailures());
-
} catch (IOException e) {
-
log.error("批量更新ES数据异常,e", e);
-
e.printStackTrace();
-
throw new RuntimeException("ES批量更新数据异常:{}" e);
-
} finally {
-
try {
-
restHighLevelClient.close();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
esDateList.clear();
-
}
-
}
Script更新
-
public void updateHobby(String user, ESEntity esEntity) throws IOException {
-
final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("user", user));
-
// 这个实际上就是对应给脚本中传参数的对象。
-
HashMap<String, Object> params = new HashMap<>(16);
-
params.put("hobby", "和女朋友爬山");
-
final Script script = new Script(
-
ScriptType.INLINE, "painless",
-
"ctx._source.hobby = params.hobby",
-
params);
-
updateByQuery(queryBuilder, "索引名称", script);
-
}
出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。
以上代码需要变动一下,将一些参数替换掉。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgaacei
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24