• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

java操作ElasticSearch:批量操作

武飞扬头像
心有城府,腹有良谋
帮助1

pom文件

  1.  
    # 版本号
  2.  
    <version.elasticsearch-rest-high-level-client>6.5.0</version.elasticsearch-rest-high-level-client>
  3.  
     
  4.  
        <dependency>
  5.  
    <groupId>org.elasticsearch.client</groupId>
  6.  
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
  7.  
    <version>${version.elasticsearch-rest-high-level-client}</version>
  8.  
    </dependency>
  9.  
    <dependency>
  10.  
    <groupId>org.elasticsearch.client</groupId>
  11.  
    <artifactId>elasticsearch-rest-client</artifactId>
  12.  
    <version>${version.elasticsearch-rest-high-level-client}</version>
  13.  
    </dependency>
  14.  
    <dependency>
  15.  
    <groupId>org.elasticsearch</groupId>
  16.  
    <artifactId>elasticsearch</artifactId>
  17.  
    <version>${version.elasticsearch-rest-high-level-client}</version>
  18.  
    </dependency>

RestHighLevelClient(操作ES的客户端,使用时注入bean即可)

  1.  
    public RestHighLevelClient restHighLevelClient() {
  2.  
     
  3.  
    RestClientBuilder rclientBuilder = RestClient.builder(new HttpHost(host, port, "http"))
  4.  
    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
  5.  
    @Override
  6.  
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
  7.  
    return httpClientBuilder
  8.  
    .setKeepAliveStrategy((response, context) -> Duration.ofMinutes(5).toMillis());
  9.  
    }
  10.  
    });
  11.  
    return new RestHighLevelClient(rclientBuilder);

DeleteRequest批量删除

  1.  
    /**
  2.  
    * @Description: 根据es主键删除数据
  3.  
    * @Params:
  4.  
    * @Return:
  5.  
    * @Author: Mr.myq
  6.  
    * @Date: 2022/12/2011:04
  7.  
    */
  8.  
    @Override
  9.  
    public void deleteAllByIds(List<String> ids) {
  10.  
    if (!CollectionUtils.isEmpty(ids)) {
  11.  
    try {
  12.  
    List<List<String>> outherList = SubListUtil.splitList(ids, count);
  13.  
    for (List<String> innerList : outherList) {
  14.  
    //批量删除数据
  15.  
    BulkRequest request = new BulkRequest();
  16.  
    request.timeout("60s");
  17.  
    for (String id : innerList) {
  18.  
    // 类型json/_doc
  19.  
    DeleteRequest source = new DeleteRequest().index("索引").id(id).type(”_doc“);
  20.  
    request.add(source);
  21.  
    }
  22.  
    BulkResponse response = restHighLevelClient().bulk(request, RequestOptions.DEFAULT);
  23.  
    log.debug("ES批量删除数据 是否有失败 : " response.hasFailures());
  24.  
    }
  25.  
    } catch (IOException e) {
  26.  
    e.printStackTrace();
  27.  
    log.error("ES批量删除数据: " e.toString());
  28.  
    throw new RuntimeException("ES批量删除数据: {}", e);
  29.  
    } finally {
  30.  
    ids.clear();
  31.  
    }
  32.  
    }
  33.  
    }

BulkRequest批量新增

  1.  
    /**
  2.  
    * 批量更新es数据
  3.  
    *
  4.  
    * @param esDateList
  5.  
    */
  6.  
    private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
  7.  
    if (CollectionUtils.isEmpty(esDateList)) {
  8.  
    return;
  9.  
    }
  10.  
    BulkRequest bulkRequest = new BulkRequest();
  11.  
    bulkRequest.timeout("200s");
  12.  
    try {
  13.  
    for (int i = 0; i < esDateList.size(); i ) {
  14.  
    CmsGoodsEntity object = esDateList.get(i);
  15.  
    Map<String, Object> param = new LinkedHashMap<>();
  16.  
    String goodid = object.getGoodid();
  17.  
    param.put("tariffs_rate", object.getTariffsRate());
  18.  
    param.put("inspection_charges_fee", object.getInspectionChargesFee());
  19.  
    param.put("rates", object.getRates());
  20.  
    UpdateRequest updateRequest = new UpdateRequest(”索引“, "类型", goodid);
  21.  
    updateRequest.doc(param);
  22.  
    bulkRequest.add(updateRequest);
  23.  
    }
  24.  
    // 操作ES
  25.  
    BulkResponse bulk = restHighLevelClient().bulk(bulkRequest, RequestOptions.DEFAULT);
  26.  
    log.info("批量更新ES 是否有失败 {} ", bulk.hasFailures());
  27.  
    } catch (IOException e) {
  28.  
    e.printStackTrace();
  29.  
    throw new RuntimeException("***********批量更新ES数据异常***********");
  30.  
    } finally {
  31.  
    esDateList.clear();
  32.  
    }
  33.  
    }

DeleteByQueryRequest根据查询条件删除所有ES数据

  1.  
    /**
  2.  
    * @Description: 根据查看条件删除
  3.  
    * @Params:
  4.  
    * @Return:
  5.  
    * @Author: Mr.myq
  6.  
    * @Date: 2023/2/1415:25
  7.  
    */
  8.  
    @Override
  9.  
    public void deleteBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source) {
  10.  
    //通过QueryBuilders中的搜索逻辑
  11.  
    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  12.  
    //1 设置条件
  13.  
    //设置删除条件: key = value
  14.  
    TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
  15.  
    if (!StringUtils.isEmpty(hisp)) {
  16.  
    TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
  17.  
    queryBuilder.must(hispTermQueryBuilder);
  18.  
    }
  19.  
    TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
  20.  
    queryBuilder.must(sourceTermQueryBuilder);
  21.  
    queryBuilder.must(supplierIdTermQueryBuilder);
  22.  
     
  23.  
    //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
  24.  
    DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
  25.  
    deleteByQueryRequest.setTimeout("6000s");
  26.  
    deleteByQueryRequest.setQuery(queryBuilder);
  27.  
    //指定删除索引
  28.  
    deleteByQueryRequest.indices(restClientConfig.getIndex());
  29.  
    deleteByQueryRequest.setConflicts("proceed");
  30.  
    try {
  31.  
    //3 通过deleteByQuery来发起删除请求
  32.  
    BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
  33.  
    if (deleteResponse.getDeleted() >= 1) {
  34.  
    log.info("deleteData,删除成功,删除文档条数: " deleteResponse.getDeleted() " ,indexName:" restClientConfig.getIndex());
  35.  
    }
  36.  
    } catch (IOException e) {
  37.  
    e.printStackTrace();
  38.  
    log.error("无法连接到ES目标服务器");
  39.  
    throw new TargetServerException("无法连接到ES目标服务器");
  40.  
    }
  41.  
    }

UpdateByQueryRequest批量更新

  1.  
    /**
  2.  
    * @Description: 根据查看条件更新
  3.  
    * @Params:
  4.  
    * @Return:
  5.  
    * @Author: Mr.myq
  6.  
    * @Date: 2023/2/1415:25
  7.  
    */
  8.  
    public void updateBySupplierIdAndHispAndSource(Integer supplierId, String hisp, Integer source,String value) {
  9.  
    //通过QueryBuilders中的搜索逻辑
  10.  
    BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
  11.  
    //1 设置条件
  12.  
    //设置删除条件: key = value
  13.  
    TermQueryBuilder supplierIdTermQueryBuilder = QueryBuilders.termQuery("supplier_id", supplierId);
  14.  
    if (!StringUtils.isEmpty(hisp)) {
  15.  
    TermQueryBuilder hispTermQueryBuilder = QueryBuilders.termQuery("hisp", hisp);
  16.  
    queryBuilder.must(hispTermQueryBuilder);
  17.  
    }
  18.  
    TermQueryBuilder sourceTermQueryBuilder = QueryBuilders.termQuery("source", source);
  19.  
    queryBuilder.must(sourceTermQueryBuilder);
  20.  
    queryBuilder.must(supplierIdTermQueryBuilder);
  21.  
     
  22.  
    //2 通过DeleteByQueryRequest来构建删除请求,setQuery来装载条件,indices来指定索引
  23.  
    UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest();
  24.  
    updateByQueryRequest.setTimeout("6000s");
  25.  
    updateByQueryRequest.setQuery(queryBuilder);
  26.  
     
  27.  
    // 这个实际上就是对应给脚本中传参数的对象。
  28.  
    HashMap<String, Object> params = new HashMap<>(16);
  29.  
    params.put("hobby", "修改的参数value");
  30.  
    final Script script = new Script(
  31.  
    ScriptType.INLINE, "painless",
  32.  
    "ctx._source.hobby = params.hobby",
  33.  
    params);
  34.  
    updateByQueryRequest.setScript(script);
  35.  
    //指定索引
  36.  
    updateByQueryRequest.indices(restClientConfig.getIndex());
  37.  
    updateByQueryRequest.setConflicts("proceed");
  38.  
    try {
  39.  
    //3 通过deleteByQuery来发起删除请求
  40.  
    BulkByScrollResponse deleteResponse = restClientConfig.restHighLevelClient().updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
  41.  
    if (deleteResponse.getUpdated() >= 1) {
  42.  
    log.info("更新成功,更新文档条数: " deleteResponse.getUpdated() " ,indexName:" restClientConfig.getIndex());
  43.  
    }
  44.  
    } catch (IOException e) {
  45.  
    e.printStackTrace();
  46.  
    log.error("无法连接到ES目标服务器");
  47.  
    throw new TargetServerException("无法连接到ES目标服务器");
  48.  
    }
  49.  
    }

BulkRequest批量删除

  1.  
     
  2.  
    /**
  3.  
    * 批量更新es数据
  4.  
    *
  5.  
    * @param esDateList
  6.  
    */
  7.  
    private void batchUpdateEsData(List<CmsGoodsEntity> esDateList) {
  8.  
    if (CollectionUtils.isEmpty(esDateList)) {
  9.  
    return;
  10.  
    }
  11.  
    BulkRequest bulkRequest = new BulkRequest();
  12.  
    bulkRequest.timeout("600s");
  13.  
    RestHighLevelClient restHighLevelClient = restClientConfig.restHighLevelClient();
  14.  
    try {
  15.  
    for (int i = 0; i < esDateList.size(); i ) {
  16.  
    CmsGoodsEntity object = esDateList.get(i);
  17.  
    Map<String, Object> param = new LinkedHashMap<>();
  18.  
    String goodid = object.getGoodid();
  19.  
    param.put("tariffs_rate", object.getTariffsRate());
  20.  
    param.put("inspection_charges_fee", object.getInspectionChargesFee());
  21.  
    param.put("rates", object.getRates());
  22.  
    UpdateRequest updateRequest = new UpdateRequest(restClientConfig.getIndex(), "_doc", goodid);
  23.  
    updateRequest.doc(param);
  24.  
    bulkRequest.add(updateRequest);
  25.  
    }
  26.  
    // 操作ES
  27.  
    BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  28.  
    log.warn("批量更新ES 是否有失败 {} ", bulk.hasFailures());
  29.  
    } catch (IOException e) {
  30.  
    log.error("批量更新ES数据异常,e", e);
  31.  
    e.printStackTrace();
  32.  
    throw new RuntimeException("ES批量更新数据异常:{}" e);
  33.  
    } finally {
  34.  
    try {
  35.  
    restHighLevelClient.close();
  36.  
    } catch (IOException e) {
  37.  
    e.printStackTrace();
  38.  
    }
  39.  
    esDateList.clear();
  40.  
    }
  41.  
    }

Script更新

  1.  
    public void updateHobby(String user, ESEntity esEntity) throws IOException {
  2.  
    final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("user", user));
  3.  
    // 这个实际上就是对应给脚本中传参数的对象。
  4.  
    HashMap<String, Object> params = new HashMap<>(16);
  5.  
    params.put("hobby", "和女朋友爬山");
  6.  
    final Script script = new Script(
  7.  
    ScriptType.INLINE, "painless",
  8.  
    "ctx._source.hobby = params.hobby",
  9.  
    params);
  10.  
    updateByQuery(queryBuilder, "索引名称", script);
  11.  
    }

出现: 版本冲突、文档类型不对、JAR包与使用的API不一致或其他问题。都可参考以下连接。

ElasticSearch超级实用API描述

以上代码需要变动一下,将一些参数替换掉。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaacei
系列文章
更多 icon
同类精品
更多 icon
继续加载