ElasticSearch(分组统计,自动补全,数据同步)
1.分组统计
桶(bucket):
桶的作用
,是按照某种方式对数据进行分组
,每一组数据在ES中称为一个桶
,ES中提供的划分桶的方式有很多:
- Date Histogram Aggregation:根据
日期阶梯分组
,例如给定阶梯为周,会自动每周分为一组 - Histogram Aggregation:根据
数值阶梯分组
,与日期类似,需要知道分组的间隔(interval) - Terms Aggregation:根据
词条内容分组
,词条内容完全匹配的为一组 - Range Aggregation:
数值和日期
的范围分组,指定开始和结束,然后按段分组 - ……
度量(metrics):
分组完成以后,我们一般会对组中的数据进行聚合运算
,例如求平均值、最大、最小、求和等,这些在ES中称为度量
常见度量聚合方式:
- Avg Aggregation:求平均值
- Max Aggregation:求最大值
- Min Aggregation:求最小值
- Percentiles Aggregation:求百分比
- Stats Aggregation:同时返回avg、max、min、sum、count等
- Sum Aggregation:求和
- Top hits Aggregation:求前几
- Value Count Aggregation:求总数
- ……
1-1.聚合为桶
统计价格在500元之内酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
GET /hotel/_search
{
"query":{ //搜索条件
"range": {
"price":{
"lte": 500
}
}
},
"size": 0, //不查询具体的数据
"aggs": { //声明这是一个聚合查询,是aggregations的缩写
"brandAgg": { //给这次聚合起一个名字,可任意指定
"terms": { //聚合的类型,这里选择terms,是根据词条内容(这里是品牌)划分
"field": "brand", //按照哪个字段分组
"size": 10, //显示多少条聚合结果
"order": {
"_count": "asc" //分组之后可以根据数量排序
}
}
}
}
}
结果:
{
"took" : 4,
"timed_out" : false,
"_shards" : {
//省略
},
"hits" : {
"total" : {
"value" : 95,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ] //不显示搜索的数据结果
},
"aggregations" : {
"brandAgg" : { //桶的名称
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 78,
"buckets" : [
{
"key" : "万丽",
"doc_count" : 1
},
{
"key" : "万怡",
"doc_count" : 1
}
//...省略
]
}
}
}
1-2.桶内度量
前面的例子告诉我们每个桶里面的文档数量,这很有用。 但通常,我们的应用需要提供更复杂的文档度量。 例如,每种品牌酒店的平均价格是多少?
因此,我们需要告诉ES使用哪个字段
,使用何种度量方式
进行运算,这些信息要嵌套在桶
内,度量
的运算会基于桶
内的文档进行
GET /hotel/_search
{
"query":{
"match_all": {}
},
"size": 0, //不查询具体的数据
"aggs": {
"brandAgg": {
"terms": {
"field": "brand", //按照哪个字段分组
"order": {
"scoreAgg.avg": "desc" //根据指定的统计项排序
}
},
"aggs":{ //是brands聚合的子聚合,也就是分组后对每组分别计算
"scoreAgg": {//聚合名称
"stats": {//聚合类型,这里stats可以计算min、max、avg等
"field":"score"//聚合字段,这里是score
}
}
}
}
}
}
结果:
{
"took" : 6,
"timed_out" : false,
"_shards" : { },
"hits" : { },
"aggregations" : {
"brandAgg" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 111,
"buckets" : [
{
"key" : "万丽",
"doc_count" : 2,
"scoreAgg" : {
"count" : 2,
"min" : 46.0,
"max" : 47.0,
"avg" : 46.5,
"sum" : 93.0
}
},
{
"key" : "凯悦",
"doc_count" : 8,
"scoreAgg" : {
"count" : 8,
"min" : 45.0,
"max" : 47.0,
"avg" : 46.25,
"sum" : 370.0
}
},
//省略
}
]
}
}
}
2.RestAPI
构建测试类AggsTest
//聚合为桶
@Test
public void aggs() throws IOException {
//1. 创建请求对象
SearchRequest request = new SearchRequest("hotel");
//2. 设置条件
request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件
request.source().size(0);//不显示查询结果
request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand")
.size(10).order(BucketOrder.count(true)));//集合为桶
//3. 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4. 处理结果
//4-1获取聚合结果
Terms brandTerm = response.getAggregations().get("brandAgg");
//4-2 获取结果中的桶
for (Terms.Bucket bucket : brandTerm.getBuckets()) {
System.out.println(bucket.getKey() ":" bucket.getDocCount());
}
}
//聚合为桶, 桶内度量
@Test
public void aggs2() throws IOException {
//1. 创建请求对象
SearchRequest request = new SearchRequest("hotel");
//2. 设置条件
request.source().query(QueryBuilders.rangeQuery("price").lte(500));//设置查询条件
request.source().size(0);//不显示查询结果
request.source().aggregation(
AggregationBuilders.terms("brandAgg").field("brand").size(10).order(BucketOrder.count(true))//集合为桶
.subAggregation(AggregationBuilders.stats("scoreAgg").field("score"))//按照评分统计
);
//3. 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//4. 处理结果
//4-1获取聚合结果
Terms brandTerm = response.getAggregations().get("brandAgg");
//4-2 获取结果中的桶
for (Terms.Bucket bucket : brandTerm.getBuckets()) {
ParsedStats stats = (ParsedStats) bucket.getAggregations().getAsMap().get("scoreAgg");//获取统计结果
System.out.println(bucket.getKey() ":" bucket.getDocCount() ":" stats.getAvg());
}
}
结果条件过滤
结果是一个Map结构:
- key是字符串,城市、星级、品牌、价格
- value是集合,例如多个城市的名称
Controller
//统计
@PostMapping("/hotel/filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams requestParams) throws IOException {
return hotelService.filters(requestParams);
}
ServiceImpl
//统计结果
@Override
public Map<String, List<String>> filters(RequestParams requestParams) throws IOException {
//1. 构建查询请求
SearchRequest request = new SearchRequest("hotel");
//2. 设置查询条件
//2-1 创建复合查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//2-2 获取搜索关键字, 设置为查询条件
String key = requestParams.getKey();
if (StringUtils.isEmpty(key)) {
boolQuery.must(QueryBuilders.matchAllQuery());
} else {
boolQuery.must(QueryBuilders.matchQuery("all", key));
}
//2-3 获取城市、星级、品牌、价格,使用过滤语法筛选
// 城市
if (StrUtil.isNotEmpty(requestParams.getCity())) {
boolQuery.filter(QueryBuilders.termQuery("city", requestParams.getCity()));
}
// 星级
if (StrUtil.isNotEmpty(requestParams.getStarName())) {
boolQuery.filter(QueryBuilders.termQuery("starName", requestParams.getStarName()));
}
// 品牌
if (StrUtil.isNotEmpty(requestParams.getBrand())) {
boolQuery.filter(QueryBuilders.termQuery("brand", requestParams.getBrand()));
}
// 价格
if (requestParams.getMinPrice() != null && requestParams.getMaxPrice() != null) {
boolQuery.filter(QueryBuilders.rangeQuery("price").gte(requestParams.getMinPrice()).lte(requestParams.getMaxPrice()));
}
request.source().query(boolQuery);//设置查询条件
request.source().size(0);//不查询信息
//3. 设置统计分析
request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(50));
request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(50));
request.source().aggregation(AggregationBuilders.terms("starNameAgg").field("starName").size(50));
//4. 发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//5. 处理统计结果
Aggregations aggregations = response.getAggregations();
List<String> brandAgg = ((Terms) response.getAggregations().get("brandAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
List<String> cityAgg = ((Terms) response.getAggregations().get("cityAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
List<String> starNameAgg = ((Terms) response.getAggregations().get("starNameAgg")).getBuckets().stream().map(e -> e.getKeyAsString()).collect(Collectors.toList());
Map<String, List<String>> map = new HashMap<>();
map.put("brand",brandAgg);
map.put("city",cityAgg);
map.put("starName",starNameAgg);
return map;
}
数据同步
安装MQ
① 使用docker容器安装mq
docker run \
-v mq-plugins:/plugins \
--name mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
② 登录管理界面
浏览器访问:http://192.168.149.128:15672/
账号: guest
密码: guest
③ 配置虚拟主机、用户
例如:
账号: hotel
密码: hotel
主机: /hotel
声明交换机、队列
① 引入依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
② 配置文件
spring:
rabbitmq:
host: 192.168.149.128
port: 5672
virtual-host: /hotel
username: hotel
password: hotel
③ 声明队列交换机名称
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/**
* 新增或修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
④ 创建交换机和队列
@Configuration
public class MqConfiguration {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(MqConstants.HOTEL_EXCHANGE);
}
// 声明第1个队列
@Bean
public Queue insertQueue() {
return new Queue(MqConstants.HOTEL_INSERT_QUEUE);
}
// 队列1绑定交换机
@Bean
public Binding bindingInsertQueue(TopicExchange topicExchange, Queue insertQueue) {
return BindingBuilder.bind(insertQueue).to(topicExchange).with(MqConstants.HOTEL_INSERT_KEY);
}
// 声明第2个队列
@Bean
public Queue deleteQueue() {
return new Queue(MqConstants.HOTEL_DELETE_QUEUE);
}
// 队列2绑定交换机
@Bean
public Binding bindingDeleteQueue(TopicExchange topicExchange, Queue deleteQueue) {
return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstants.HOTEL_DELETE_KEY);
}
}
发送MQ消息
接收MQ消息
@Component
public class HotelListener {
@Autowired
private RestHighLevelClient client;
@Autowired
private HotelMapper hotelMapper;
// 新增、修改
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void insertOrUpdateHotel(Long id) throws IOException {
// 1.创建request
IndexRequest request = new IndexRequest("hotel").id(id.toString());
// 2.准备DSL
Hotel hotel = hotelMapper.selectById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
System.out.println("es同步新增或更新:" id);
}
// 删除
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void deleteHotel(Long id) throws IOException {
// 1.创建request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
// 2.发送请求
client.delete(request, RequestOptions.DEFAULT);
System.out.println("es同步删除:" id);
}
}
搭建集群
修改系统配置
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vim /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
创建集群
编写一个docker-compose.yml文件,内容如下:
version: '2.2'
services:
es01:
image: elasticsearch:7.12.1
container_name: es01
environment:
- node.name=es01
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es02,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9202:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9203:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local
networks:
elastic:
driver: bridge
在docker-compose.yml目录下执行下面命令,运行集群
docker-compose up -d
集群状态监控
创建索引库
使用head插件创建索引库,分片设置为3,每个分片设置1个副本查看分片效果
回到首页,即可查看索引库分片效果:
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfhifke
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01