0%

Elasticsearch搭建

Elasticsearch 专题

ELK 名词解释

“ELK”是三个开源项目的首字母缩写,这三个项目分别是:Elasticsearch、Logstash 和 Kibana。

  • Elasticsearch 是一个搜索和分析引擎。
  • Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。
  • Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。
    Elastic Stack 是 ELK Stack 的更新换代产品

elastic 官网

部署(docker)

注意:elk 各个版本要求对应,包括分词插件,否则存在各种问题

elasticsearch 部署

1
2
3
4
docker pull elasticsearch:7.2.0
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -d elasticsearch:7.2.0
# 检查
curl http://localhost:9200
  • 修改配置文件 /usr/share/elasticsearch/config/elasticsearch.yml 解决跨域、密码问题
1
2
3
4
5
6
7
8
9
10
11
cluster.name: "docker-cluster"
network.host: 0.0.0.0


http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS, HEAD, GET, POST, PUT, DELETE
http.cors.allow-headers: Authorization, X-Requested-With, X-Auth-Token, Content-Type, Content-Length
http.cors.allow-credentials: true

xpack.security.enabled: true
  • 安装 ik 中文分词器
1
2
3
4
cd /usr/share/elasticsearch/plugins/
elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.2.0/elasticsearch-analysis-ik-7.2.0.zip
exit
docker restart elasticsearch

验证方法:kibana -> dev tools

1
2
3
4
5
POST test/_analyze
{
"analyzer": "ik_max_word",
"text": "你好我是东邪Jiafly"
}
  • 设置 elasticsearch 密码
1
2
3
4
5
docker exec -it elasticsearch /bin/bash
cd /usr/share/elasticsearch/bin
elasticsearch-setup-passwords interactive

# 默认用户名为 elastic

kibana 部署

1
2
3
4
docker pull kibana:7.2.0
docker run --name kibana --link=elasticsearch:test -p 5601:5601 -d kibana:7.2.0
docker start kibana
curl http://localhost:5601
  • 修改 kibana 配置文件 /usr/share/kibana/config/kibana.yml
1
2
3
4
5
6
7
8
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true

elasticsearch.username: "elastic"
elasticsearch.password: "123456"
i18n.locale: "zh-CN"

logstash 部署

部署需要同步的服务器上(如数据库服务器、文件服务器等)

1
2
3
4
5
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.2.0.tar.gz
tar zxvf logstash-7.2.0.tar.gz
mv logstash-7.2.0 logstash
cd logstash
bin/logstash -t -f config/logstash-sample.conf
  • 修改配置文件 logstash/config/logstash.yml
1
2
# 开启配置
path.config: ~/soft/logstash/config/conf.d/*.conf
  • 数据库配置 ~/soft/logstash/config/conf.d/logstash-mysql.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    input{
    stdin {
    }
    jdbc {
    # 驱动方式
    jdbc_driver_library => "~/soft/mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar"
    # 驱动类名
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql 数据库链接,blog为数据库名 &useSSL=false 这个是为了防止不支持ssl连接的问题
    jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/roi-youke?characterEncoding=utf8&useSSL=false"
    # 连接数据库用户名
    jdbc_user => "root"
    # 连接数据库密码
    jdbc_password => "123456"
    # 是否启用分页
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含>义为每分钟都更新
    schedule => "* * * * * "
    type => "jdbc"
    # 执行sql文路径及名称
    # statement_filepath => "~/soft/logstash/blog.sql"
    # 直接写sql语句用这个
    # statement => "select `article_detail`.article_id,article_title as title,article_detail_content as content from `article` LEFT JOIN `article_detail` ON `article`.`article_id` = `article_detail`.article_id"
    statement => "select * from user_game_record where record_id > :sql_last_value"
    use_column_value => true
    tracking_column => "record_id"
    # 保存上一次运行>的信息(tracking_column)
    last_run_metadata_path => "./logstash_jdbc_last_run"
    }
    }

    filter{
    json{
    source => "message"
    remove_field => ["message"]
    }
    }
    #output插件配置
    output{
    elasticsearch {
    #这里可以是数组,可以是多个节点的地址,会自动启用负载均衡
    hosts => ["192.168.31.24:9200"]
    #index名称
    index => "roi-youke.user_game_list"
    #document_type => "haoyebao" #文档类型
    #document_type =>"article"
    #文档id,必须设置,且表达式的变量存在,否则只>能插入一条记录
    document_id => "%{record_id}"
    }
    #控制台打印json
    stdout {
    codec => json_lines
    }
    }

mysql 驱动文件下载

1
2
wget http://ftp.jaist.ac.jp/pub/mysql/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz
tar zxvf mysql-connector-java-8.0.27.tar.gz
  • 文件服务器配置 ~/soft/logstash/config/conf.d/logstash-file.conf
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    input {
    # beats {
    # port => 5044
    # }

    file {
    path => "/tmp/log/*"
    start_position => "beginning"
    }
    }

    output {
    elasticsearch {
    hosts => ["http://localhost:9200"]
    index => test
    #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    #user => "elastic"
    #password => "changeme"
    }
    }

elasticsearch-head 部署

  1. 优点:
  • 数据在分片的存储位置
  • dsl 领域特定语言 查询 结果表格化、智能化
  • 便于分析节点信息、索引信息等
  1. 缺点:
  • 安全性,登录密码完全暴露
  1. 安装
1
2
3
4
5
6
7
8
9
docker pull mobz/elasticsearch-head:5
docker create --name elasticsearch-head -p 9100:9100 mobz/elasticsearch-head:5
docker start elasticsearch-head

# 修改文件 /usr/src/app/_site/vendor.js 解决请求
# 将 application/x-www-form-urlencoded 改成 application/json;charset=UTF-8

# 访问 如果有密码,带后面参数
http://localhost:9100/?auth_user=elastic&auth_password=123456

框架集成 (springboot)

示例

配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# pom.xml
<spring.boot.version>2.3.0.RELEASE</spring.boot.version>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

# application.yml
spring:
elasticsearch:
rest:
uris: http://192.168.31.24:9200
username: elastic
password: 123456
Person.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.xmwefun.roicloud.pay.entity;

import com.xmwefun.roicloud.pay.utils.EsConsts;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.util.Date;

/**
* <p>
* 用户实体类
* </p>
*
* @author andy
* @date Created in 2018-12-20 17:29
*/
@Document(indexName = EsConsts.INDEX_NAME, type = EsConsts.TYPE_NAME, shards = 1, replicas = 0)
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {
/**
* 主键
*/
@Id
private Long id;

/**
* 名字
*/
@Field(type = FieldType.Keyword)
private String name;

/**
* 国家
*/
@Field(type = FieldType.Keyword)
private String country;

/**
* 年龄
*/
@Field(type = FieldType.Integer)
private Integer age;

/**
* 生日
*/
@Field(type = FieldType.Date)
private Date birthday;

/**
* 介绍
*/
@Field(type = FieldType.Text, analyzer = "ik_smart")
private String remark;
}
PersonRepository.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.xmwefun.roicloud.pay.repository;

import com.xmwefun.roicloud.pay.entity.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

import java.util.List;

/**
* @author fumingzhen
*/
public interface PersonRepository extends ElasticsearchRepository<Person, Long> {

/**
* 根据年龄区间查询
*
* @param min 最小值
* @param max 最大值
* @return 满足条件的用户列表
*/
List<Person> findByAgeBetween(Integer min, Integer max);
}
PersonRepositoryTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package com.xmwefun.roicloud.pay.repository;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.google.common.collect.Lists;
import com.xmwefun.roicloud.pay.RoiPayApplicationTest;
import com.xmwefun.roicloud.pay.entity.Person;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.ParsedAvg;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilter;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;

import java.util.List;

@Slf4j
public class PersonRepositoryTest extends RoiPayApplicationTest {

@Test
public void findByAgeBetween() {
}

@Autowired
private PersonRepository repo;
@Autowired
private ElasticsearchRestTemplate esTemplate;

/**
* 测试新增
*/
@Test
public void save() {
Person person = new Person(1L, "刘备", "蜀国", 18, DateUtil.parse("1990-01-02 03:04:05"), "刘备(161年-223年6月10日),即汉昭烈帝(221年-223年在位),又称先主,字玄德,东汉末年幽州涿郡涿县(今河北省涿州市)人,西汉中山靖王刘胜之后,三国时期蜀汉开国皇帝、政治家。\n刘备少年时拜卢植为师;早年颠沛流离,备尝艰辛,投靠过多个诸侯,曾参与镇压黄巾起义。先后率军救援北海相孔融、徐州牧陶谦等。陶谦病亡后,将徐州让与刘备。赤壁之战时,刘备与孙权联盟击败曹操,趁势夺取荆州。而后进取益州。于章武元年(221年)在成都称帝,国号汉,史称蜀或蜀汉。《三国志》评刘备的机权干略不及曹操,但其弘毅宽厚,知人待士,百折不挠,终成帝业。刘备也称自己做事“每与操反,事乃成尔”。\n章武三年(223年),刘备病逝于白帝城,终年六十三岁,谥号昭烈皇帝,庙号烈祖,葬惠陵。后世有众多文艺作品以其为主角,在成都武侯祠有昭烈庙为纪念。");
Person save = repo.save(person);
log.info("【save】= {}", save);
}

/**
* 测试批量新增
*/
@Test
public void saveList() {
List<Person> personList = Lists.newArrayList();
personList.add(new Person(2L, "曹操", "魏国", 20, DateUtil.parse("1988-01-02 03:04:05"), "曹操(155年-220年3月15日),字孟德,一名吉利,小字阿瞒,沛国谯县(今安徽亳州)人。东汉末年杰出的政治家、军事家、文学家、书法家,三国中曹魏政权的奠基人。\n曹操曾担任东汉丞相,后加封魏王,奠定了曹魏立国的基础。去世后谥号为武王。其子曹丕称帝后,追尊为武皇帝,庙号太祖。\n东汉末年,天下大乱,曹操以汉天子的名义征讨四方,对内消灭二袁、吕布、刘表、马超、韩遂等割据势力,对外降服南匈奴、乌桓、鲜卑等,统一了中国北方,并实行一系列政策恢复经济生产和社会秩序,扩大屯田、兴修水利、奖励农桑、重视手工业、安置流亡人口、实行“租调制”,从而使中原社会渐趋稳定、经济出现转机。黄河流域在曹操统治下,政治渐见清明,经济逐步恢复,阶级压迫稍有减轻,社会风气有所好转。曹操在汉朝的名义下所采取的一些措施具有积极作用。\n曹操军事上精通兵法,重贤爱才,为此不惜一切代价将看中的潜能分子收于麾下;生活上善诗歌,抒发自己的政治抱负,并反映汉末人民的苦难生活,气魄雄伟,慷慨悲凉;散文亦清峻整洁,开启并繁荣了建安文学,给后人留下了宝贵的精神财富,鲁迅评价其为“改造文章的祖师”。同时曹操也擅长书法,唐朝张怀瓘在《书断》将曹操的章草评为“妙品”。"));
personList.add(new Person(3L, "孙权", "吴国", 19, DateUtil.parse("1989-01-02 03:04:05"), "孙权(182年-252年5月21日),字仲谋,吴郡富春(今浙江杭州富阳区)人。三国时代孙吴的建立者(229年-252年在位)。\n孙权的父亲孙坚和兄长孙策,在东汉末年群雄割据中打下了江东基业。建安五年(200年),孙策遇刺身亡,孙权继之掌事,成为一方诸侯。建安十三年(208年),与刘备建立孙刘联盟,并于赤壁之战中击败曹操,奠定三国鼎立的基础。建安二十四年(219年),孙权派吕蒙成功袭取刘备的荆州,使领土面积大大增加。\n黄武元年(222年),孙权被魏文帝曹丕册封为吴王,建立吴国。同年,在夷陵之战中大败刘备。黄龙元年(229年),在武昌正式称帝,国号吴,不久后迁都建业。孙权称帝后,设置农官,实行屯田,设置郡县,并继续剿抚山越,促进了江南经济的发展。在此基础上,他又多次派人出海。黄龙二年(230年),孙权派卫温、诸葛直抵达夷州。\n孙权晚年在继承人问题上反复无常,引致群下党争,朝局不稳。太元元年(252年)病逝,享年七十一岁,在位二十四年,谥号大皇帝,庙号太祖,葬于蒋陵。\n孙权亦善书,唐代张怀瓘在《书估》中将其书法列为第三等。"));
personList.add(new Person(4L, "诸葛亮", "蜀国", 16, DateUtil.parse("1992-01-02 03:04:05"), "诸葛亮(181年-234年10月8日),字孔明,号卧龙,徐州琅琊阳都(今山东临沂市沂南县)人,三国时期蜀国丞相,杰出的政治家、军事家、外交家、文学家、书法家、发明家。\n早年随叔父诸葛玄到荆州,诸葛玄死后,诸葛亮就在襄阳隆中隐居。后刘备三顾茅庐请出诸葛亮,联孙抗曹,于赤壁之战大败曹军。形成三国鼎足之势,又夺占荆州。建安十六年(211年),攻取益州。继又击败曹军,夺得汉中。蜀章武元年(221年),刘备在成都建立蜀汉政权,诸葛亮被任命为丞相,主持朝政。蜀后主刘禅继位,诸葛亮被封为武乡侯,领益州牧。勤勉谨慎,大小政事必亲自处理,赏罚严明;与东吴联盟,改善和西南各族的关系;实行屯田政策,加强战备。前后六次北伐中原,多以粮尽无功。终因积劳成疾,于蜀建兴十二年(234年)病逝于五丈原(今陕西宝鸡岐山境内),享年54岁。刘禅追封其为忠武侯,后世常以武侯尊称诸葛亮。东晋政权因其军事才能特追封他为武兴王。\n诸葛亮散文代表作有《出师表》《诫子书》等。曾发明木牛流马、孔明灯等,并改造连弩,叫做诸葛连弩,可一弩十矢俱发。诸葛亮一生“鞠躬尽瘁、死而后已”,是中国传统文化中忠臣与智者的代表人物。"));
Iterable<Person> people = repo.saveAll(personList);
log.info("【people】= {}", people);
}

/**
* 测试更新
*/
@Test
public void update() {
repo.findById(1L).ifPresent(person -> {
person.setRemark(person.getRemark() + "\n更新更新更新更新更新");
Person save = repo.save(person);
log.info("【save】= {}", save);
});
}

/**
* 测试删除
*/
@Test
public void delete() {
// 主键删除
repo.deleteById(1L);

// 对象删除
repo.findById(2L).ifPresent(person -> repo.delete(person));

// 批量删除
repo.deleteAll(repo.findAll());
}

/**
* 测试普通查询,按生日倒序
*/
@Test
public void select() {
repo.findAll(Sort.by(Sort.Direction.DESC, "birthday")).forEach(person -> log.info("{} 生日: {}", person.getName(), DateUtil.formatDateTime(person.getBirthday())));
}

/**
* 自定义查询,根据年龄范围查询
*/
@Test
public void customSelectRangeOfAge() {
repo.findByAgeBetween(18, 19).forEach(person -> log.info("{} 年龄: {}", person.getName(), person.getAge()));
}

/**
* 高级查询
*/
@Test
public void advanceSelect() {
// QueryBuilders 提供了很多静态方法,可以实现大部分查询条件的封装
MatchQueryBuilder queryBuilder = QueryBuilders.matchQuery("name", "孙权");
log.info("【queryBuilder】= {}", queryBuilder.toString());

repo.search(queryBuilder).forEach(person -> log.info("【person】= {}", person));
}

/**
* 自定义高级查询
*/
@Test
public void customAdvanceSelect() {
// 构造查询条件
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
// 添加基本的分词条件
queryBuilder.withQuery(QueryBuilders.matchQuery("remark", "东汉"));
// 排序条件
queryBuilder.withSort(SortBuilders.fieldSort("age").order(SortOrder.DESC));
// 分页条件
queryBuilder.withPageable(PageRequest.of(0, 2));
Page<Person> people = repo.search(queryBuilder.build());
log.info("【people】总条数 = {}", people.getTotalElements());
log.info("【people】总页数 = {}", people.getTotalPages());
people.forEach(person -> log.info("【person】= {},年龄 = {}", person.getName(), person.getAge()));
}

/**
* 测试聚合,测试平均年龄
*/
@Test
public void agg() {
// 构造查询条件
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
// 不查询任何结果
queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{""}, null));

// 平均年龄
queryBuilder.addAggregation(AggregationBuilders.avg("avg").field("age"));

log.info("【queryBuilder】= {}", JSONUtil.toJsonStr(queryBuilder.build()));

SearchHits<Person> searchHits = esTemplate.search(queryBuilder.build(), Person.class);
searchHits.get().forEach(hit -> {
log.info("【content】= {}", hit.getContent());
});
//获取聚合结果
if (searchHits.hasAggregations()) {
ParsedAvg parsedAvg = searchHits.getAggregations().get("avg");
Assertions.assertNotNull(parsedAvg, "无聚合结果");
log.info("【avgAge】= {}", parsedAvg.getValue());
}
}

/**
* 测试高级聚合查询,每个国家的人有几个,每个国家的平均年龄是多少
*/
@Test
public void advanceAgg() {
// 构造查询条件
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
// 不查询任何结果
queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{""}, null));

// 1. 添加一个新的聚合,聚合类型为terms,聚合名称为country,聚合字段为age
queryBuilder.addAggregation(AggregationBuilders.terms("country").field("country")
// 2. 在国家聚合桶内进行嵌套聚合,求平均年龄
.subAggregation(AggregationBuilders.avg("avg").field("age")));

log.info("【queryBuilder】= {}", JSONUtil.toJsonStr(queryBuilder.build()));

// 3. 查询
AggregatedPage<Person> people = (AggregatedPage<Person>) repo.search(queryBuilder.build());

// 4. 解析
// 4.1. 从结果中取出名为 country 的那个聚合,因为是利用String类型字段来进行的term聚合,所以结果要强转为StringTerm类型
// StringTerms country = (StringTerms) people.getAggregation("country");
// // 4.2. 获取桶
// List<StringTerms.Bucket> buckets = country.getBuckets();
// for (StringTerms.Bucket bucket : buckets) {
// // 4.3. 获取桶中的key,即国家名称 4.4. 获取桶中的文档数量
// log.info("{} 总共有 {} 人", bucket.getKeyAsString(), bucket.getDocCount());
// // 4.5. 获取子聚合结果:
// InternalAvg avg = (InternalAvg) bucket.getAggregations().asMap().get("avg");
// log.info("平均年龄:{}", avg);
// }
}
}
ElasticServiceImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package com.xmwefun.roicloud.es.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.xmwefun.roicloud.es.entity.IndexInfo;
import com.xmwefun.roicloud.es.entity.UrlEnum;
import com.xmwefun.roicloud.es.service.IElasticService;
import com.xmwefun.roicloud.es.utils.ElasticUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.search.aggregations.Aggregations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.*;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.IndexQuery;
import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import static org.springframework.data.elasticsearch.core.TotalHitsRelation.EQUAL_TO;

/**
* @Author: andyfu
* @Date: 2022/7/12 10:37 AM
*/
@Slf4j
@Service
public class ElasticServiceImpl implements IElasticService {
@Autowired
private ElasticUtil elasticUtil;

@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;

/**
* 获取所有索引
*
* @param prefix
* @return
*/
@Override
public List<IndexInfo> getAllIndex(String prefix) {
if (StringUtils.isEmpty(prefix)) {
prefix = "";
}

String url = UrlEnum.INDEX_ALL.getUrl(prefix);
String result = elasticUtil.get(url);

return JSON.parseArray(result, IndexInfo.class);
}

/**
* 根据id从索引中获取数据
*
* @param indexName
* @param id
* @return
*/
@Override
public Object getData(String indexName, String id) {
String index = org.springframework.util.StringUtils.isEmpty(indexName) ? null : indexName.toLowerCase();

IndexOperations operations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));
boolean bool = operations.exists();
if (!bool) {
return null;
}

Object obj = elasticsearchRestTemplate.get(id, Object.class, IndexCoordinates.of(index));

return obj;
}

/**
* 保存数据
*
* @param indexName 索引名称
* @param id 主键
* @param map 数据
* @return
*/
@Override
public String saveData(String indexName, String id, Map map) {
String index = transferIndex(indexName);

initData(map);
log.info("操作日志:索引{}, 数据{}", index, JSONUtil.toJsonStr(map));

IndexQuery query = new IndexQueryBuilder().withId(id).withObject(map).build();
IndexCoordinates coordinates = IndexCoordinates.of(index);
String result = elasticsearchRestTemplate.index(query, coordinates);

log.info("操作结果:{}", result);
return result;
}

/**
* 获取数据
*
* @param indexName
* @param clazz
* @param nativeSearchQuery
* @param <T>
* @return
*/
@Override
public <T> SearchHits<T> queryData(String indexName, Class clazz, NativeSearchQuery nativeSearchQuery) {
String index = org.springframework.util.StringUtils.isEmpty(indexName) ? null : indexName.toLowerCase();

IndexOperations operations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));
boolean bool = operations.exists();
if (!bool) {
long totalHits = 0;
TotalHitsRelation totalHitsRelation = EQUAL_TO;
float maxScore = 0;
String scrollId = "";
List<? extends SearchHit<T>> searchHits = new ArrayList<>();
Aggregations aggregations = new Aggregations(new ArrayList<>());
return new SearchHitsImpl(totalHits, totalHitsRelation, maxScore, scrollId, searchHits, aggregations);
}

SearchHits<T> search = elasticsearchRestTemplate.search(nativeSearchQuery, clazz, IndexCoordinates.of(index));
return search;
}

/**
* 保存数据
*
* @param indexName 索引名称
* @param mapParam 数据
* @return
*/
@Override
public String saveBatchData(String indexName, Map<String, Map> mapParam) {
String index = transferIndex(indexName);

String result = "";
if (MapUtils.isNotEmpty(mapParam)) {
List<IndexQuery> list = new ArrayList<>();

for (Map.Entry<String, Map> entry : mapParam.entrySet()) {
String id = entry.getKey();
Map map = entry.getValue();

initData(map);
log.info("操作日志:索引{}, 数据{}", index, JSONUtil.toJsonStr(map));

IndexQuery query = new IndexQueryBuilder().withId(id).withObject(map).build();
list.add(query);
}

IndexCoordinates coordinates = IndexCoordinates.of(index);
List<String> resultList = elasticsearchRestTemplate.bulkIndex(list, coordinates);

result = JSONUtil.toJsonStr(resultList);
} else {
result = "需要保存数据为空,请检查";
}
log.info("操作结果:{}", result);

return result;
}


/**
* 保存数据
*
* @param indexName 索引名称
* @param listTmp 数据
* @return
*/
@Override
public String saveBatchData(String indexName, List listTmp) {
String index = transferIndex(indexName);

String result = "";
if (CollectionUtils.isNotEmpty(listTmp)) {
List<IndexQuery> list = new ArrayList<>();

for (Object o : listTmp) {
Map map = JSON.parseObject(JSON.toJSONString(o));

initData(map);
log.info("操作日志:索引{}, 数据{}", index, JSONUtil.toJsonStr(map));

IndexQuery query = new IndexQueryBuilder().withObject(map).build();
list.add(query);
}

IndexCoordinates coordinates = IndexCoordinates.of(index);
List<String> resultList = elasticsearchRestTemplate.bulkIndex(list, coordinates);

result = JSONUtil.toJsonStr(resultList);
} else {
result = "需要保存数据为空,请检查";
}
log.info("操作结果:{}", result);

return result;
}

/**
* 索引名称校验-并指定默认类型
*
* @param indexName
* @return
*/
private String transferIndex(String indexName) {
String index = org.springframework.util.StringUtils.isEmpty(indexName) ? null : indexName.toLowerCase();

IndexOperations operations = elasticsearchRestTemplate.indexOps(IndexCoordinates.of(index));
boolean bool = operations.exists();
log.info("index: {} 存在: {}", index, bool);
if (!bool) {
Document mapping = Document.from(
JSONUtil.toBean("{\"properties\":{\"roi_createTime\":{\"type\":\"date\"}}}", Map.class)
);
Document settings = Document.from(
JSONUtil.toBean("{\"index.mapping.total_fields.limit\":5000,\"default_pipeline\":\"chage_utc_to_asiash\"}", Map.class)
);

operations.create(settings);
operations.putMapping(mapping);
}

return index;
}

/**
* 添加必要数据
*
* @param map
*/
private void initData(Map map) {
if (map != null && !map.isEmpty()) {
// 添加创建时间
map.put("roi_createTime", DateUtil.formatDateTime(new Date()));
}
}

/**
* 保存数据
*
* @param indexName 索引名称
* @param map 数据
* @return
*/
@Override
public String saveData(String indexName, Map map) {
String index = transferIndex(indexName);

initData(map);
log.info("操作日志:索引{}, 数据{}", index, JSONUtil.toJsonStr(map));

IndexQuery query = new IndexQueryBuilder().withObject(map).build();
IndexCoordinates coordinates = IndexCoordinates.of(index);
String result = elasticsearchRestTemplate.index(query, coordinates);

log.info("操作结果:{}", result);

return result;
}
}

ElasticServiceImpl.java --- demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
        NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
builder.withQuery(
QueryBuilders.boolQuery()
.should(QueryBuilders.matchPhraseQuery("phone.keyword", ""))
.should(QueryBuilders.matchPhraseQuery("union_id.keyword", ""))
.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery("phone")))
.should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery("union_id")))
);
NativeSearchQuery nativeSearchQuery = builder.build();

SearchHits<Map> search = elasticService.queryData(index, Map.class, nativeSearchQuery);
log.info("需重置的数据-index: {} total: {}, query: {}", index, search.getTotalHits(), nativeSearchQuery.getQuery().toString().replaceAll("\\R", ""));

for (org.springframework.data.elasticsearch.core.SearchHit<Map> userSearchHit : search) {
String id = userSearchHit.getId();
Map content = userSearchHit.getContent();
}

// ------------------------------------

BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
// 手机号过滤
if (!StringUtils.isEmpty(phone)) {
queryBuilder.must(QueryBuilders.wildcardQuery("phone", "*" + phone + "*"));
}

// 标签过滤
List<QueryBuilder> list = Lists.newArrayList();
if (!CollectionUtils.isEmpty(tagIds)) {
for (Object obj : tagIds) {
QueryBuilder builder = QueryBuilders.existsQuery("label_" + obj);
list.add(builder);
}

BoolQueryBuilder builderTmp = QueryBuilders.boolQuery();
for (QueryBuilder tmp : list) {
if (isAnd) {
builderTmp.must(tmp);
} else {
builderTmp.should(tmp);
}
}
queryBuilder.must(builderTmp);
}
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
// builder.withQuery(QueryBuilders.matchAllQuery());
builder.withQuery(queryBuilder);

// 分页
builder.withPageable(PageRequest.of(pageIndex.intValue() - 1, pageSize.intValue()));
NativeSearchQuery nativeSearchQuery = builder.build();

SearchHits<Map> search = elasticService.queryData(index, Map.class, nativeSearchQuery);
log.info("查询标签数据-index: {} total: {}, query: {}", index, search.getTotalHits(), nativeSearchQuery.getQuery().toString().replaceAll("\\R", ""));

IPage<Map> pageVO = new Page<>();
List<Map> searchProductList = search.stream().map(SearchHit::getContent).collect(Collectors.toList());

searchProductList.stream().forEach(item -> {
Map mapTmp = (Map) item.get("openIdMap");
if (MapUtil.isNotEmpty(mapTmp)) {
item.put("userList", mapTmp.values());
}
});

TemplateTest.java --- demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
        TermsAggregationBuilder aggregationBuilder1 = AggregationBuilders.terms("openId").field("userOpenID_evar.keyword");
TermsAggregationBuilder aAggregationBuilder2 = AggregationBuilders.terms("gameId").field("gameID_evar.keyword");
TermsAggregationBuilder aAggregationBuilder3 = AggregationBuilders.terms("prizeLevel").field("prizeLevel_evar.keyword");

String startDate = map.get("roicloud_es_beginDate");
String endDate = map.get("roicloud_es_endDate");
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.rangeQuery("roi_createTime").gte(startDate).lte(endDate));

NativeSearchQuery nativeSearchQuery = new NativeSearchQueryBuilder()
.addAggregation(aggregationBuilder1.subAggregation(aAggregationBuilder2.subAggregation(aAggregationBuilder3)))
.withQuery(queryBuilder)
// .withPageable(PageRequest.of(0, 10000))
.build();

SearchHits searchHits = esTemplate.search(nativeSearchQuery, Map.class, IndexCoordinates.of("viewhomepage")); //scansponsorqrcode

Aggregations aggregations = searchHits.getAggregations();


String searchHitsJson = JSONUtil.toJsonStr(aggregations);
System.out.println("总条数:" + searchHits.getTotalHits());
Terms enterpriseIdTerms = aggregations.get("prizeLevel");
Terms gameIdTerms = null;
Terms userTerms = null;
if (Optional.ofNullable(enterpriseIdTerms).isPresent()) {
for (Terms.Bucket bucket : enterpriseIdTerms.getBuckets()) {
gameIdTerms = bucket.getAggregations().get("gameId");
for (Terms.Bucket bucketGame : gameIdTerms.getBuckets()) {
userTerms = bucketGame.getAggregations().get("prizeLevel");
for (Terms.Bucket bucketUsesr : userTerms.getBuckets()) {
System.out.println("openId=" + bucket.getKey() + " gameId=" + bucketGame.getKey() + " prizeLevel=" + bucketUsesr.getKey() + " 领奖次数=" + bucketUsesr.getDocCount());
}
}
}
}

springboot 和 es 版本对应
es 官网 api
版本差异

  • es7 以后建议使用 high-level REST client
  • application.yml 配置不一样

es 查询语法

使用dql示例

  1. 单字段查询: term 不支持分词、match 只要能匹配上分词,就匹配上了

query.term.columnName
query.match.columnName
query.terms.columnName[]
query.range.columnName.gte

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1.sql示例
select * from user where address="抚州"

2.es查询
GET index3/_search
{
"query": {
"match": {
"address": {
"value": "抚州"
}
}
}
}

3. 默认参数
{
"query": {
"bool": {
"must": [
{
"term": {
"user.user_name.keyword": "李大侠"
}
}
],
"must_not": [],
"should": []
}
},
"from": 0,
"size": 2,
"search_after": [
"10"
],
"sort": [
{
"total_amount": "asc"
}
],
"aggs": {}
}
  1. 多字段查询
    or === query.bool.should[].term.columnName
    and === query.bool.must[].term.columnName

注意点 && 问题

注意点

  1. 索引创建后,如果没有数据可以设置 mapping,重复设置 mapping; 如果存在数据,则修改 mapping 失败
  2. 相同别名指向不同索引,会导致插入数据失败

问题

  1. es 停机后,数据会同步异常,logstash 同步出错
  2. 单节点如何扩展成集群
  3. mapping 修改,解决方案
  4. kibana 报表标题各字段中文映射问题

参考

  1. Docker 下安装 ElasticSearch 和 Kibana
  2. elasticsearch 权威指南-中文.pdf
  3. es 查询语法
  4. es 查询语法示例
  5. es 更新文档
  6. es 更新文档-添加参数
  7. es 官网 应用层联接
  8. es 时间时区问题
  9. es java分组查询示例