• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

ElasticSearch(分组统计,自动补全,数据同步)

武飞扬头像
上官玺
帮助3

1.分组统计

桶(bucket):

桶的作用,是按照某种方式对数据进行分组,每一组数据在ES中称为一个,ES中提供的划分桶的方式有很多:

  • Date Histogram Aggregation:根据日期阶梯分组,例如给定阶梯为周,会自动每周分为一组
  • Histogram Aggregation:根据数值阶梯分组,与日期类似,需要知道分组的间隔(interval)
  • Terms Aggregation:根据词条内容分组,词条内容完全匹配的为一组
  • Range Aggregation:数值和日期的范围分组,指定开始和结束,然后按段分组
  • ……

度量(metrics):

分组完成以后,我们一般会对组中的数据进行聚合运算,例如求平均值、最大、最小、求和等,这些在ES中称为度量

常见度量聚合方式:

  • Avg Aggregation:求平均值
  • Max Aggregation:求最大值
  • Min Aggregation:求最小值
  • Percentiles Aggregation:求百分比
  • Stats Aggregation:同时返回avg、max、min、sum、count等
  • Sum Aggregation:求和
  • Top hits Aggregation:求前几
  • Value Count Aggregation:求总数
  • ……

1-1.聚合为桶

统计价格在500元之内酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。

GET /hotel/_search
{
  "query":{ //搜索条件
      "range": {
          "price":{
            "lte": 500
          } 
      }
  }, 
  "size": 0, //不查询具体的数据  
  "aggs": { //声明这是一个聚合查询,是aggregations的缩写
    "brandAgg": { //给这次聚合起一个名字,可任意指定
      "terms": { //聚合的类型,这里选择terms,是根据词条内容(这里是品牌)划分
        "field": "brand", //按照哪个字段分组
        "size": 10, //显示多少条聚合结果  
        "order": {
           "_count": "asc" //分组之后可以根据数量排序
        }
      }
    }
  }
}
学新通
结果:
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    //省略
  },
  "hits" : {
    "total" : {
      "value" : 95,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ] //不显示搜索的数据结果
  },
  "aggregations" : {
    "brandAgg" : { //桶的名称
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 78,
      "buckets" : [
        {
          "key" : "万丽",
          "doc_count" : 1
        },
        {
          "key" : "万怡",
          "doc_count" : 1
        }
       //...省略
      ]
    }
  }
}

学新通

1-2.桶内度量

前面的例子告诉我们每个桶里面的文档数量,这很有用。 但通常,我们的应用需要提供更复杂的文档度量。 例如,每种品牌酒店的平均价格是多少?

因此,我们需要告诉ES使用哪个字段使用何种度量方式进行运算,这些信息要嵌套在内,度量的运算会基于内的文档进行

GET /hotel/_search
{
  "query":{
      "match_all": {}
  }, 
  "size": 0, //不查询具体的数据    
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand", //按照哪个字段分组
        "order": {
           "scoreAgg.avg": "desc" //根据指定的统计项排序
        }
      },
      "aggs":{  //是brands聚合的子聚合,也就是分组后对每组分别计算
          "scoreAgg": {//聚合名称
            "stats": {//聚合类型,这里stats可以计算min、max、avg等
              "field":"score"//聚合字段,这里是score
            }
          }
        }
    }
  }
}
学新通
结果:
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : { },
  "hits" : { },
  "aggregations" : {
    "brandAgg" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 111,
      "buckets" : [
        {
          "key" : "万丽",
          "doc_count" : 2,
          "scoreAgg" : {
            "count" : 2,
            "min" : 46.0,
            "max" : 47.0,
            "avg" : 46.5,
            "sum" : 93.0
          }
        },
        {
          "key" : "凯悦",
          "doc_count" : 8,
          "scoreAgg" : {
            "count" : 8,
            "min" : 45.0,
            "max" : 47.0,
            "avg" : 46.25,
            "sum" : 370.0
          }
        },
        //省略
        }
      ]
    }
  }
}
学新通

2.RestAPI

学新通
构建测试类AggsTest

    //聚合为桶
    @Test
    public void aggs() throws IOException {
        //1. 创建请求对象
        SearchRequest request = new SearchRequest("hotel");

        //2. 设置条件
        request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件
        request.source().size(0);//不显示查询结果
        request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand")
                                     .size(10).order(BucketOrder.count(true)));//集合为桶

        //3. 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //4. 处理结果
        //4-1获取聚合结果
        Terms brandTerm = response.getAggregations().get("brandAgg");
        //4-2 获取结果中的桶
        for (Terms.Bucket bucket : brandTerm.getBuckets()) {
            System.out.println(bucket.getKey()   ":"   bucket.getDocCount());
        }
    }
学新通

学新通

    //聚合为桶, 桶内度量
    @Test
    public void aggs2() throws IOException {
        //1. 创建请求对象
        SearchRequest request = new SearchRequest("hotel");

        //2. 设置条件
        request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件
        request.source().size(0);//不显示查询结果
        request.source().aggregation(
                AggregationBuilders.terms("brandAgg").field("brand").size(10).order(BucketOrder.count(true))//集合为桶
                .subAggregation(AggregationBuilders.stats("scoreAgg").field("score"))//按照评分统计
        );

        //3. 发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //4. 处理结果
        //4-1获取聚合结果
        Terms brandTerm = response.getAggregations().get("brandAgg");
        //4-2 获取结果中的桶
        for (Terms.Bucket bucket : brandTerm.getBuckets()) {
            ParsedStats  stats = (ParsedStats) bucket.getAggregations().getAsMap().get("scoreAgg");//获取统计结果
            System.out.println(bucket.getKey()   ":"   bucket.getDocCount() ":" stats.getAvg());
        }
    }
学新通

结果条件过滤

学新通
结果是一个Map结构:

  • key是字符串,城市、星级、品牌、价格
  • value是集合,例如多个城市的名称
    Controller
    //统计
    @PostMapping("/hotel/filters")
    public Map<String, List<String>> getFilters(@RequestBody RequestParams requestParams) throws IOException {
        return hotelService.filters(requestParams);
    }

ServiceImpl

	//统计结果
    @Override
    public Map<String, List<String>> filters(RequestParams requestParams) throws IOException {
        //1. 构建查询请求
        SearchRequest request = new SearchRequest("hotel");

        //2. 设置查询条件
        //2-1 创建复合查询
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        //2-2 获取搜索关键字, 设置为查询条件
        String key = requestParams.getKey();
        if (StringUtils.isEmpty(key)) {
            boolQuery.must(QueryBuilders.matchAllQuery());
        } else {
            boolQuery.must(QueryBuilders.matchQuery("all", key));
        }

        //2-3 获取城市、星级、品牌、价格,使用过滤语法筛选
        // 城市
        if (StrUtil.isNotEmpty(requestParams.getCity())) {
            boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity()));
        }
        // 星级
        if (StrUtil.isNotEmpty(requestParams.getStarName())) {
            boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName()));
        }
        // 品牌
        if (StrUtil.isNotEmpty(requestParams.getBrand())) {
            boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand()));
        }
        // 价格
        if (requestParams.getMinPrice() != null && requestParams.getMaxPrice() != null) {
            boolQuery.filter(QueryBuilders.rangeQuery("price").gte(requestParams.getMinPrice()).lte(requestParams.getMaxPrice()));
        }

        request.source().query(boolQuery);//设置查询条件
        request.source().size(0);//不查询信息

        //3. 设置统计分析
        request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(50));
        request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(50));
        request.source().aggregation(AggregationBuilders.terms("starNameAgg").field("starName").size(50));


        //4. 发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        //5. 处理统计结果
        Aggregations aggregations = response.getAggregations();
        List<String> brandAgg = ((Terms) response.getAggregations().get("brandAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
        List<String> cityAgg = ((Terms) response.getAggregations().get("cityAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
        List<String> starNameAgg = ((Terms) response.getAggregations().get("starNameAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());

        Map<String, List<String>> map = new HashMap<>();
        map.put("brand",brandAgg);
        map.put("city",cityAgg);
        map.put("starName",starNameAgg);
        return map;
    }
学新通

数据同步

安装MQ

① 使用docker容器安装mq

docker run \
 -v mq-plugins:/plugins \
 --name mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

② 登录管理界面

浏览器访问:http://192.168.149.128:15672/

账号: guest

密码: guest

③ 配置虚拟主机、用户
例如:

账号: hotel
密码: hotel
主机: /hotel

学新通

声明交换机、队列

学新通

① 引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

② 配置文件

spring:
  rabbitmq:
    host: 192.168.149.128
    port: 5672
    virtual-host: /hotel
    username: hotel
    password: hotel

③ 声明队列交换机名称

public class MqConstants {
    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
学新通

④ 创建交换机和队列

@Configuration
public class MqConfiguration {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE);
    }

    // 声明第1个队列
    @Bean
    public Queue insertQueue() {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE);
    }

    // 队列1绑定交换机
    @Bean
    public Binding bindingInsertQueue(TopicExchange topicExchange, Queue insertQueue) {
        return BindingBuilder.bind(insertQueue).to(topicExchange).with(MqConstants.HOTEL_INSERT_KEY);
    }

    // 声明第2个队列
    @Bean
    public Queue deleteQueue() {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE);
    }

    // 队列2绑定交换机
    @Bean
    public Binding bindingDeleteQueue(TopicExchange topicExchange, Queue deleteQueue) {
        return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstants.HOTEL_DELETE_KEY);
    }
}
学新通

发送MQ消息

学新通

接收MQ消息

@Component
public class HotelListener {

    @Autowired
    private RestHighLevelClient client;

    @Autowired
    private HotelMapper hotelMapper;

    // 新增、修改
    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void insertOrUpdateHotel(Long id) throws IOException {
        // 1.创建request
        IndexRequest request = new IndexRequest("hotel").id(id.toString());

        // 2.准备DSL
        Hotel hotel = hotelMapper.selectById(id);
        HotelDoc hotelDoc = new HotelDoc(hotel);
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);

        // 3.发送请求
        client.index(request, RequestOptions.DEFAULT);
        System.out.println("es同步新增或更新:"   id);
    }

    // 删除
    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void deleteHotel(Long id) throws IOException {
        // 1.创建request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());

        // 2.发送请求
        client.delete(request, RequestOptions.DEFAULT);
        System.out.println("es同步删除:"   id);
    }
}
学新通

搭建集群

修改系统配置

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vim /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

创建集群
编写一个docker-compose.yml文件,内容如下:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9202:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9203:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge
学新通

在docker-compose.yml目录下执行下面命令,运行集群

docker-compose up -d

集群状态监控

学新通
创建索引库
使用head插件创建索引库,分片设置为3,每个分片设置1个副本
查看分片效果
回到首页,即可查看索引库分片效果:
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhifke
系列文章
更多 icon
同类精品
更多 icon
继续加载