喵星之旅-狂奔的兔子-使用java对es7进行基本操作

这里选择使用RestHighLevelClient进行操作,es版本7.6.2

添加依赖

参看:喵星之旅-狂奔的兔子-springboot添加es7依赖

配置

yml添加配置参数

1
2
3
4
5
kysjkbunny:
elasticsearch:
hostname: 127.0.0.1
port: 9205
scheme: http

配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package org.jeecg.modules.demo.es.config;


import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

/**
* @author : bunny
* @description :
*/
@Slf4j
@Configuration
public class ElasticsearchConfig {
@Value("${kysjkbunny.elasticsearch.hostname}")
private String host;
@Value("${kysjkbunny.elasticsearch.port}")
private Integer port;
@Value("${kysjkbunny.elasticsearch.scheme}")
private String scheme;

@Bean
public RestHighLevelClient restHighLevelClient() {
/*RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(host, port, scheme)));
return client;*/


RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost(host,port,scheme)
).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(90000000)//25hours
.setSocketTimeout(90000000);
}
}).setHttpClientConfigCallback((httpAsyncClientBuilder -> {
httpAsyncClientBuilder.disableAuthCaching();
httpAsyncClientBuilder.setKeepAliveStrategy((httpResponse,httpContext) -> TimeUnit.MINUTES.toMillis(3));
httpAsyncClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build());
return httpAsyncClientBuilder;
}))
);

return client;
}
}

基础工具类

实现对索引的基本操作和对数据的基本操作(不包含复杂查询)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
package org.jeecg.modules.demo.es.util;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.GeoDistanceQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* @Author: bunny

* @Description: Elasticsearch工具类
*/
@Component
public class ElasticsearchUtil {
@Autowired
private RestHighLevelClient restHighLevelClient;

public static final int SIZE = 100;//同步数据时一次处理数量

/**
* 功能描述:创建索引
* @Param:
* @param index 创建的索引的名字
* @Return: boolean 成功创建返回true,如果索引已经存在或者创建失败返回false
* @Author: bunny

* @Description: 创建所引用,正常不应该执行的方法,慎用
*/
public boolean createIndex(String index) throws IOException {
if (isIndexExist(index)) {
return false;
}
//1.创建索引请求
CreateIndexRequest request = new CreateIndexRequest(index);
//突破1万条限制
request.settings(Settings.builder()
.put("index.max_result_window", 2000000000)
);
//2.执行客户端请求
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
}
/**
* 功能描述:判断索引是否存在
* @Param:
* @param index 待判断的索引名字
* @Return: boolean 索引存在则返回true,否则false
* @Author: bunny

* @Description: 正常不会使用到的方法
*/
public boolean isIndexExist(String index) throws IOException {
GetIndexRequest request = new GetIndexRequest();
request.indices(index);
boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
return exists;
}
/**
* 功能描述:删除索引
* @Param:
* @param index 待删除的索引名字
* @Return: boolean 不存在则返回false,删除成功返回true
* @Author: bunny

* @Description: 正常不会使用的方法,删除整个索引
*/
public boolean deleteIndex(String index) throws IOException {
if (!isIndexExist(index)) {
return false;
}
DeleteIndexRequest request = new DeleteIndexRequest(index);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
return delete.isAcknowledged();
}

/**
* 功能描述:向es中添加一条数据
* @Param:
* @param object 待存入的数据对象
* @param index 存放数据的索引名字
* @Return: java.lang.String 返回插入数据的id
* @Author: bunny
* @Description:
*/
public String addData(Object object, String index) throws IOException {
//创建请求
IndexRequest request = new IndexRequest(index);
request.timeout(TimeValue.timeValueSeconds(1));
//将数据放入请求 json
request.source(JSON.toJSONString(object), XContentType.JSON);
//客户端发送请求
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
return response.getId();
}

/**
* 功能描述:通过索引和id删除一条数据,如果数据不存在则删除0条数据
* @Param:
* @param index es中索引名字
* @param id 数据的id值
* @Return: void
* @Author: bunny
* @Description:
*/
public void deleteDataById(String index, String id) throws IOException {
DeleteRequest request = new DeleteRequest(index);
request.id(id);
restHighLevelClient.delete(request, RequestOptions.DEFAULT);
}

/**
* 通过ID 更新数据 * @param object 要更新数据 * @param index 索引,类似数据库
* * @param id 数据ID * @return
*/
public void updateDataById(Object object, String index, String id) throws IOException {
UpdateRequest update = new UpdateRequest();
update.index(index);
update.id(id);
update.timeout("1s");
update.doc(JSON.toJSONString(object), XContentType.JSON);
restHighLevelClient.update(update, RequestOptions.DEFAULT);
}

/**
* 通过ID获取数据 * @param index 索引,类似数据库 * @param id 数据ID
* * @param fields 需要显示的字段,逗号分隔(缺省为全部字段) * @return
*/
public Map<String, Object> searchDataById(String index, String id, String fields) throws IOException {
GetRequest request = new GetRequest();
request.index(index);
request.id(id);
if (StringUtils.isNotEmpty(fields)) {
//只查询特定字段。如果需要查询所有字段则不设置该项。
request.fetchSourceContext(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
}
GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
return response.getSource();
}

/**
* 通过ID判断文档是否存在 * @param index 索引,类似数据库 * @param id 数据ID * @return
*/
public boolean existsById(String index, String id) throws IOException {
GetRequest request = new GetRequest();
request.index(index);
request.id(id);
//不获取返回的_source的上下文
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
return restHighLevelClient.exists(request, RequestOptions.DEFAULT);
}

/**
* 批量插入false成功 * @param index 索引,类似数据库 * @param objects 数据 * @return
*/
public boolean bulkPost(String index, List<?> objects) {
BulkRequest bulkRequest = new BulkRequest();
BulkResponse response = null;
//最大数量不得超过20万
for (Object object : objects) {
IndexRequest request = new IndexRequest(index);
request.source(JSON.toJSONString(object), XContentType.JSON);
bulkRequest.add(request);
}
try {
response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return response.hasFailures();
}

/**
* 根据经纬度查询范围查找location 经纬度字段,distance 距离中心范围KM,lat lon 圆心经纬度
* * @param index * @param longitude * @param latitude * @param distance * @return
*/
public SearchResponse geoDistanceQuery(String index, Float longitude, Float latitude, String distance) throws IOException {
if (longitude == null || latitude == null) {
return null;
}
//拼接条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
QueryBuilder isdeleteBuilder = QueryBuilders.termQuery("isdelete", false);
// 以某点为中心,搜索指定范围
GeoDistanceQueryBuilder distanceQueryBuilder = new GeoDistanceQueryBuilder("location");
distanceQueryBuilder.point(latitude, longitude);
//查询单位:km
distanceQueryBuilder.distance(distance, DistanceUnit.KILOMETERS);
boolQueryBuilder.filter(distanceQueryBuilder);
// boolQueryBuilder.must(isdeleteBuilder);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
return searchResponse;
}


/**
* 高亮结果集 特殊处理 * map转对象 JSONObject.parseObject(JSONObject.toJSONString(map), Content.class) * @param searchResponse * @param highlightField
*/
private List<Map<String, Object>> setSearchResponse(SearchResponse searchResponse, String highlightField) {
//解析结果
ArrayList<Map<String, Object>> list = new ArrayList<>();
for (SearchHit hit : searchResponse.getHits().getHits()) {
Map<String, HighlightField> high = hit.getHighlightFields();
HighlightField title = high.get(highlightField);
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
//原来的结果 //解析高亮字段,将原来的字段换为高亮字段
if (title != null) {
Text[] texts = title.fragments();
String nTitle = "";
for (Text text : texts) {
nTitle += text;
}
//替换
sourceAsMap.put(highlightField, nTitle);
}
list.add(sourceAsMap);
}
return list;
}

/**
* 查询并分页 * @param index 索引名称 * @param query 查询条件
* * @param size 文档大小限制 * @param fields 需要显示的字段,逗号分隔(缺省为全部字段)
* * @param sortField 排序字段 * @param highlightField 高亮字段 * @return
*/
public List<Map<String, Object>> searchListData(String index, SearchSourceBuilder query, Integer from, Integer size, String fields, String sortField, String highlightField) throws IOException {
SearchRequest request = new SearchRequest(index);
SearchSourceBuilder builder = query;
if (StringUtils.isNotEmpty(fields)) {
//只查询特定字段。如果需要查询所有字段则不设置该项。
builder.fetchSource(new FetchSourceContext(true, fields.split(","), Strings.EMPTY_ARRAY));
}
from = from <= 0 ? 0 : from * size;
//设置确定结果要从哪个索引开始搜索的from选项,默认为0
builder.from(from);
builder.size(size);
if (StringUtils.isNotEmpty(sortField)) {
//排序字段,注意如果proposal_no是text类型会默认带有keyword性质,需要拼接.keyword
builder.sort(sortField + ".keyword", SortOrder.ASC);
}
//高亮
HighlightBuilder highlight = new HighlightBuilder();
highlight.field(highlightField);
//关闭多个高亮
highlight.requireFieldMatch(false);
highlight.preTags("<span style='color:red'>");
highlight.postTags("</span>");
builder.highlighter(highlight);
//不返回源数据。只有条数之类的数据。
builder.fetchSource(false);
request.source(builder);
SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
System.out.println(JSON.toJSONString(response.getHits()));
if (response.status().getStatus() == 200) {
// 解析对象
return setSearchResponse(response, highlightField);
}
return null;
}



}

测试类

编写接口,测试部分功能,结合kibana查看数据结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package org.jeecg.modules.demo.es.controller;

import org.jeecg.modules.demo.es.util.ElasticsearchUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@RestController
public class TestEs {

@Autowired
private ElasticsearchUtil elasticsearchUtil;

@RequestMapping("/bunnytestes")
public String test(@RequestParam(name="name") String name) throws IOException {
boolean f = elasticsearchUtil.createIndex(name);
return "" + f;
}

@RequestMapping("/bunnytestes/del")
public String del(@RequestParam(name="name") String name) throws IOException {
boolean f = elasticsearchUtil.deleteIndex(name);
return "" + f;
}

@RequestMapping("/bunnytestes/dtoadd")
public String dtoadd(@RequestParam(name="name") String name) throws IOException {
Map<String, String> m = new HashMap<>();
m.put("name", UUID.randomUUID().toString());
m.put("age", "" + Math.random());
String f = elasticsearchUtil.addData(m, name);
return "" + f;
}
@RequestMapping("/bunnytestes/dtodel")
public String dtodel(@RequestParam(name="name") String name,@RequestParam(name="id") String id) throws IOException {
elasticsearchUtil.deleteDataById(name, id);
return "ok" + Math.random() ;
}
@RequestMapping("/bunnytestes/dtoupdate")
public String dtoupdate(@RequestParam(name="name") String name,@RequestParam(name="id") String id) throws IOException {
Map<String, String> m = new HashMap<>();
m.put("name", UUID.randomUUID().toString());
m.put("age", "hehe" + Math.random());
elasticsearchUtil.updateDataById(m,name,id);
return "ok" + Math.random() ;
}


}

文章目录
  1. 添加依赖
  2. 配置
  3. 基础工具类
  4. 测试类
|