Springboot整合ES8(Java API Client)

家电修理 2023-07-16 19:17www.caominkang.com电器维修

在 Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端 RestHighLevelClient标记为弃用状态。推出了全新的 Java API客户端 Elasticsearch Java API Client,该客户端也将在 Elasticsearch8.0及以后版本中成为官方推荐使用的客户端。

Elasticsearch Java API Client 支持除 Vector tile search API 和 Find structure API 之外的所有 Elasticsearch API。且支持所有API数据类型,并且不再有原始JsonValue属性。它是针对Elasticsearch8.0及之后版本的客户端,所以我们需要学习新的Elasticsearch Java API Client的使用方法。

前面使用的是 ES 7.x的版本,这次使用 ES 8.1.3版本。使用Docker搭建环境还是蛮简单的。

一、Springboot整合ES8

引入依赖

	
 
   co.elastic.clients
   elasticsearch-java
   8.1.3
 

 
   .fasterxml.jackson.core
   jackson-databind
   2.13.3
 

注意

  • ES8 API使用大量的Builder模式,所以我们在使用API时,也尽量使用它。
1、ES配置类

Java连接 Elasticsearch8的核心步骤

  • 先创建 RestClient。
  • 再基于 RestClient创建 ElasticsearchTransport。
  • 基于 ElasticsearchTransport创建 ElasticsearchClient。

不论是直连,还是带安全检查的连接,都是这个不变的核心步骤。根据业务可能会会加一点参数配置等。

// 配置的前缀
@ConfigurationProperties(prefix = "elasticsearch") 
@Configuration
public class ESClientConfig {

	
	@Setter
	private String hosts;

	
	@Bean
	public ElasticsearchClient elasticsearchClient() {
		HttpHost[] httpHosts = toHttpHost();
		// Create the RestClient 
		RestClient restClient = RestClient.builder(httpHosts).build();
		// Create the transport ith a Jackson mapper
		RestClientTransport transport = ne RestClientTransport(restClient, ne JacksonJsonpMapper());
		// create the API client
		return ne ElasticsearchClient(transport);
	}

	
	@Bean
	public ElasticsearchAsyncClient elasticsearchAsyncClient() {
		HttpHost[] httpHosts = toHttpHost();
		RestClient restClient = RestClient.builder(httpHosts).build();
		RestClientTransport transport = ne RestClientTransport(restClient, ne JacksonJsonpMapper());
		return ne ElasticsearchAsyncClient(transport);
	}

	
	private HttpHost[] toHttpHost() {
		if (!StringUtils.hasLength(hosts)) {
			thro ne RuntimeException("invalid elasticsearch configuration. elasticsearch.hosts不能为空!");
		}

		// 多个IP逗号隔开
		String[] hostArray = hosts.split(",");
		HttpHost[] httpHosts = ne HttpHost[hostArray.length];
		HttpHost httpHost;
		for (int i = 0; i < hostArray.length; i++) {
			String[] strings = hostArray[i].split(":");
			httpHost = ne HttpHost(strings[0], Integer.parseInt(strings[1]), "http");
			httpHosts[i] = httpHost;
		}

		return httpHosts;
	}

}
2、配置文件

在application.yml配置文件中添加 ES的服务地址等信息。

## ES配置@ConfigurationProperties(prefix = "elasticsearch") //配置的前缀
elasticsearch:
  # 多个IP逗号隔开
  hosts: 192.168.xxx.xxx:9200
3、单元测试

启动类没什么变化,和以前一样。我们直接写一个测试类来操作ES。

@RunWith(SpringRunner.class)
@SpringBootTest
public class ESClientConfigTest {

	@Autoired
	private ElasticsearchClient client;

	
	@Test
	public void createIndex() thros IOException {

		CreateIndexResponse products = client.indices().create(c -> c.index("db_idx5"));
		System.out.println(products.acknoledged());

	}

	
	@Test
	public void createExi() thros IOException {
		BooleanResponse exists = client.indices().exists(e -> e.index("db_idx5"));
		System.out.println(exists.value());

	}

}

测试ok,到此Springboot整合ES8就ok了。

二、索引操作 1、业务类

1)接口

public interface IndexService {

	
	void createIndex(String name) thros IOException;

	
	void createIndex(String name, Function> settingFn,
			Function> mappingFn) thros IOException;

	
	void deleteIndex(String name) thros IOException;

	
	void updateIndexProperty(String name, HashMap propertyMap) thros IOException;

	
	GetIndexResponse getIndexList() thros IOException;

	
	GetIndexResponse getIndexDetail(String name) thros IOException;

	
	boolean indexExists(String name) thros IOException;

}

2)实现类

@Service
@Slf4j
public class IndexServiceImpl implements IndexService {

 @Autoired
 private ElasticsearchClient elasticsearchClient;

 @Override
 public void createIndex(String name) thros IOException {
  //ApplicationContext applicationContext;
  CreateIndexResponse response = elasticsearchClient.indices().create(c -> c.index(name));
  log.info("createIndex方法,acknoledged={}", response.acknoledged());
 }

 @Override
 public void createIndex(String name,
        Function> settingFn,
        Function> mappingFn) thros IOException {
  CreateIndexResponse response = elasticsearchClient
    .indices()
    .create(c -> c
      .index(name)
      .settings(settingFn)
      .mappings(mappingFn)
    );
  log.info("createIndex方法,acknoledged={}", response.acknoledged());
 }

 @Override
 public void deleteIndex(String name) thros IOException {
  DeleteIndexResponse response = elasticsearchClient.indices().delete(c -> c.index(name));
  log.info("deleteIndex方法,acknoledged={}", response.acknoledged());
 }

 @Override
 public void updateIndexProperty(String name, HashMap propertyMap) thros IOException {
  PutMappingResponse response = elasticsearchClient.indices()
    .putMapping(typeMappingBuilder ->
        typeMappingBuilder
        .index(name)
        .properties(propertyMap)
  );
  log.info("updateIndexMapping方法,acknoledged={}", response.acknoledged());
 }

 @Override
 public GetIndexResponse getIndexList() thros IOException {
  //使用  或者 _all都可以
  GetIndexResponse response = elasticsearchClient.indices().get(builder -> builder.index("_all"));
  log.info("getIndexList方法,response.result()={}", response.result().toString());
  return response;
 }

 @Override
 public GetIndexResponse getIndexDetail(String name) thros IOException {
  GetIndexResponse response = elasticsearchClient.indices().get(builder -> builder.index(name));
  log.info("getIndexDetail方法,response.result()={}", response.result().toString());
  return response;
 }

 @Override
 public boolean indexExists(String name) thros IOException {
  return elasticsearchClient.indices().exists(b -> b.index(name)).value();
 }
2、单元测试
 @Autoired
 private IndexService indexService;

 @Test
 public void testCreateIndex() thros Exception {
  String indexName = "db_api_idx1";
  indexService.createIndex(indexName);

  //Assertions.assertTrue(indexService.indexExists(indexName));
  //indexService.createIndex(indexName);
  //Assertions.assertFalse(indexService.indexExists(indexName));
 }

 @Test
 public void testCreateIndex2() thros Exception {
  // 索引名
  String indexName = "db_api_idx2";

  // 构建setting
  Function> settingFn = sBuilder -> sBuilder
    .index(iBuilder -> iBuilder
      // 三个分片
      .numberOfShards("3")
      // 一个副本
      .numberOfReplicas("1")
    );
  // 索引字段,每个字段都有自己的property
  Property keyordProperty = Property.of(pBuilder -> pBuilder.keyord(keyordPropertyBuilder -> keyordPropertyBuilder.ignoreAbove(256)));
  Property integerProperty = Property.of(pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder));
  Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder));

  // 构建mapping
  Function> mappingFn = mBuilder -> mBuilder
    .properties("name", keyordProperty)
    .properties("age", integerProperty)
    .properties("description", textProperty);

  // 创建索引,并指定setting和mapping
  indexService.createIndex(indexName, settingFn, mappingFn);
 }

 @Test
 public void testIndexExists() thros Exception {
  String indexName = "db_api_idx1";
  System.out.println(indexService.indexExists(indexName));
 }

 @Test
 public void testUpdateIndexProperty() thros Exception {
  String indexName = "db_api_idx2";

  // 索引字段,每个字段都有自己的property
  Property keyordProperty = Property.of(pBuilder -> pBuilder.keyord(keyordPropertyBuilder -> keyordPropertyBuilder.ignoreAbove(1024)));
  Property integerProperty = Property.of(pBuilder -> pBuilder.integer(integerNumberPropertyBuilder -> integerNumberPropertyBuilder));
  Property textProperty = Property.of(pBuilder -> pBuilder.text(tBuilder -> tBuilder));

  HashMap propertyMap = ne HashMap<>();
  propertyMap.put("name", keyordProperty);
  propertyMap.put("description", textProperty);
  propertyMap.put("address", textProperty);

  // 构建mapping
  indexService.updateIndexProperty(indexName, propertyMap);
 }

 @Test
 public void testGetIndexList() thros Exception {
  indexService.getIndexList();
 }

 @Test
 public void testGetIndexDetail() thros Exception {
  String indexName = "db_api_idx2";
  indexService.getIndexDetail(indexName);
 }

 @Test
 public void testDeleteIndex() thros Exception {
  String indexName = "db_api_idx1";
  indexService.deleteIndex(indexName);
 }
三、文档操作 1、业务类

注意

  • 接口中的 Object类,可以定义成我们的业务实体类。
  • 如果写的通用点,通过泛型,我们可以自定义一个基类,所有业务实体类继承它。

1)接口

public interface DocumentDemoService {

 
 IndexResponse createByFluentDSL(String idxName, String idxId, Object document) thros Exception;

 
 IndexResponse createByBuilderPattern(String idxName, String idxId, Object document) thros Exception;

 
 IndexResponse createByJson(String idxName, String idxId, String jsonContent) thros Exception;


 
 void createAsync(String idxName, String idxId, Object document, BiConsumer action);

 
 BulkResponse bulkCreate(String idxName, List documents) thros Exception;


 
 Object getById(String idxName, String docId) thros IOException;

 
 ObjectNode getObjectNodeById(String idxName, String docId) thros IOException;

 
 Boolean deleteById(String idxName, String docId) thros IOException;

 
 BulkResponse bulkDeleteByIds(String idxName, List docIds) thros Exception;

}
 

2)实现类

@Slf4j
@Service
public class DocumentDemoServiceImpl implements DocumentDemoService {

 @Autoired
 private ElasticsearchClient elasticsearchClient;

 @Autoired
 private ElasticsearchAsyncClient elasticsearchAsyncClient;

 @Override
 public IndexResponse createByFluentDSL(String idxName, String idxId, Object document) thros Exception {
  IndexResponse response = elasticsearchClient.index(idx -> idx
    .index(idxName)
    .id(idxId)
    .document(document));
  return response;
 }

 @Override
 public IndexResponse createByBuilderPattern(String idxName, String idxId, Object document) thros Exception {
  IndexRequest.Builder indexReqBuilder = ne IndexRequest.Builder<>();

  indexReqBuilder.index(idxName);
  indexReqBuilder.id(idxId);
  indexReqBuilder.document(document);
  return elasticsearchClient.index(indexReqBuilder.build());
 }

 @Override
 public IndexResponse createByJson(String idxName, String idxId, String jsonContent) thros Exception {
  return elasticsearchClient.index(i -> i
    .index(idxName)
    .id(idxId)
    .ithJson(ne StringReader(jsonContent))
  );
 }

 @Override
 public void createAsync(String idxName, String idxId, Object document, BiConsumer action) {
  elasticsearchAsyncClient.index(idx -> idx
    .index(idxName)
    .id(idxId)
    .document(document)
  ).henComplete(action);
 }

 @Override
 public BulkResponse bulkCreate(String idxName, List documents) thros Exception {
  BulkRequest.Builder br = ne BulkRequest.Builder();

  // TODO 可以将 Object定义为一个文档基类。比如 ESDocument类

  // 将每一个product对象都放入builder中
  //documents.stream()
  //  .forEach(esDocument -> br
  //    .operations(op -> op
  //      .index(idx -> idx
  //        .index(idxName)
  //        .id(esDocument.getId())
  //        .document(esDocument))));

  return elasticsearchClient.bulk(br.build());
 }

 @Override
 public Object getById(String idxName, String docId) thros IOException {
  GetResponse response = elasticsearchClient.get(g -> g
      .index(idxName)
      .id(docId),
    Object.class);
  return response.found() ? response.source() : null;
 }

 @Override
 public ObjectNode getObjectNodeById(String idxName, String docId) thros IOException {
  GetResponse response = elasticsearchClient.get(g -> g
      .index(idxName)
      .id(docId),
    ObjectNode.class);

  return response.found() ? response.source() : null;
 }

 @Override
 public Boolean deleteById(String idxName, String docId) thros IOException {
  DeleteResponse delete = elasticsearchClient.delete(d -> d
    .index(idxName)
    .id(docId));
  return delete.forcedRefresh();
 }

 @Override
 public BulkResponse bulkDeleteByIds(String idxName, List docIds) thros Exception {
  BulkRequest.Builder br = ne BulkRequest.Builder();

  // 将每一个对象都放入builder中
  docIds.stream().forEach(id -> br
      .operations(op -> op
        .delete(d -> d
          .index(idxName)
          .id(id))));

  return elasticsearchClient.bulk(br.build());
 }
}
 
2、单元测试 
 private final static String INDEX_NAME = "db_api_idx_uservo";

 @Autoired
 private DocumentDemoService documentDemoService;


 @Test
 public void testCreateByFluentDSL() thros Exception {
  // 构建文档数据
  UserVO userVO = ne UserVO();
  userVO.setId(1L);
  userVO.setUserName("赵云2");
  userVO.setAge(11);
  userVO.setCreateTime(ne Date());
  userVO.setUpdateTime(ne Date());
  userVO.setEmail("ss.");
  userVO.setVersion(1);
  userVO.setHeight(12D);

  // 新增一个文档
  IndexResponse response = documentDemoService.createByFluentDSL(INDEX_NAME, userVO.getId().toString(), userVO);

  System.out.println("response.forcedRefresh() -> " + response.forcedRefresh());
  System.out.println("response.toString() -> " + response.toString());
 }

 @Test
 public void testCreateByBuilderPattern() thros Exception {
  // 构建文档数据
  UserVO userVO = ne UserVO();
  userVO.setId(2L);
  userVO.setUserName("赵云2");
  userVO.setAge(12);
  userVO.setCreateTime(ne Date());
  userVO.setUpdateTime(ne Date());
  userVO.setEmail("ss.");
  userVO.setVersion(1);
  userVO.setHeight(12D);

  // 新增一个文档
  IndexResponse response = documentDemoService.createByBuilderPattern(INDEX_NAME, userVO.getId().toString(), userVO);

  System.out.println("response.toString() -> " + response.toString());
 }

 @Test
 public void testCreateByJSON() thros Exception {
  // 构建文档数据
  UserVO userVO = ne UserVO();
  userVO.setId(3L);
  userVO.setUserName("赵云3");
  userVO.setAge(13);
  userVO.setCreateTime(ne Date());
  userVO.setUpdateTime(ne Date());
  userVO.setEmail("ss.");
  userVO.setVersion(1);
  userVO.setHeight(12D);

  // 新增一个文档
  IndexResponse response = documentDemoService.createByJson(INDEX_NAME, userVO.getId().toString(), JSON.toJSONString(userVO));

  System.out.println("response.toString() -> " + response.toString());
 }

 @Test
 public void testCreateAsync() thros Exception {
  // 构建文档数据
  UserVO userVO = ne UserVO();
  userVO.setId(4L);
  userVO.setUserName("赵云4");
  userVO.setAge(14);
  userVO.setCreateTime(ne Date());
  userVO.setUpdateTime(ne Date());
  userVO.setEmail("ss.");
  userVO.setVersion(1);
  userVO.setHeight(12D);

  documentDemoService.createAsync(INDEX_NAME, userVO.getId().toString(), userVO, ne BiConsumer<>() {
   @Override
   public void aept(IndexResponse indexResponse, Throable throable) {
    // throable必须为空
    Assertions.assertNull(throable);
    // 验证结果
    System.out.println("response.toString() -> " + indexResponse.toString());
   }
  });
 }

 @Test
 public void testBulkCreate() thros Exception {
  int start = 5;
  int end = 10;

  // 构造文档集合
  List list = ne ArrayList<>();
  for (int i = 5; i <= 7; i++) {
   UserVO userVO = ne UserVO();
   userVO.setId(Long.valueOf(i));
   userVO.setUserName("赵云batch" + i );
   userVO.setHeight(1.88D);
   userVO.setAge(10 + i);
   userVO.setCreateTime(ne Date());
   list.add(userVO);
  }

  // 批量新增
  BulkResponse response = documentDemoService.bulkCreate(INDEX_NAME, list);
  List items = response.items();
  for (BulkResponseItem item : items) {
   System.out.println("BulkResponseItem.toString() -> " + item.toString());
  }
 }

 @Test
 public void testGetById() thros Exception {
  Long id = 1L;
  Object object = documentDemoService.getById(INDEX_NAME, id.toString());

  System.out.println("object ->" + object);
  // 无法直接强转,会报错
  //UserVO userVO = (UserVO) object;
  //System.out.println("userVO ->" + object);

 }

 @Test
 public void testGetObjectNode() thros Exception {
  Long id = 1L;
  ObjectNode objectNode = documentDemoService.getObjectNodeById(INDEX_NAME, id.toString());

  Assertions.assertNotNull(objectNode);
  System.out.println("id ->" + objectNode.get("id").asLong());
  System.out.println("userName ->" + objectNode.get("userName").asText());
 }
 

– 求知若饥,虚心若愚。

Copyright © 2016-2025 www.caominkang.com 曹敏电脑维修网 版权所有 Power by