搜索引擎项目
帖子搜索
数据同步
- 全量同步至ES 项目启动时将数据库数据存至ES
- 定时增量同步至ES 每一分钟进行保存 保存时判断时间 避免数据重复保存
搜索
用户搜索
数据同步
- 全量同步至ES 项目启动时将数据库数据存至ES
- 实时增量同步至ES 用户信息发生改变立刻将数据存至ES
搜索
实现过程
1.创建user的es实体类
UserEsDao
继承ElasticsearchRepository
实现基本的CRUD操作
UserEsDTO
用户ES包装类, 与ES中user文档相对应
2.使用dev tools
创建user文档
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| PUT user_v1 { "aliases": { "user": {} }, "mappings": { "properties": { "userName": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 64 } } }, "userProfile": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "userAvatar": { "type": "keyword" }, "createTime": { "type": "date" }, "updateTime": { "type": "date" }, "isDelete": { "type": "keyword" } } } }
|
3.实现启动时全量同步至ES
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Component @Slf4j public class FullSyncUserToEs implements CommandLineRunner {
@Resource private UserService userService;
@Resource private UserEsDao userEsDao;
@Override public void run(String... args) { userEsDao.deleteAll();
List<User> userList = userService.list(); if (CollUtil.isEmpty(userList)) { return; } List<UserEsDTO> userEsDTOList = userList.stream().map(UserEsDTO::objToDto).collect(Collectors.toList()); final int pageSize = 500; int total = userEsDTOList.size(); log.info("FullSyncUserToEs start, total {}", total); for (int i = 0; i < total; i += pageSize) { int end = Math.min(i + pageSize, total); log.info("sync from {} to {}", i, end); userEsDao.saveAll(userEsDTOList.subList(i, end)); } log.info("FullSyncUserToEs end, total {}", total); } }
|
4.实现用户数据增量同步至ES
JDK17使用Canal的注意事项
启动时报错: Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
因为JDK17不支持PermSize参数故需要将start.bat中的:
set JAVA_MEM_OPTS= -Xms128m -Xmx512m -XX:PermSize=128m 修改为
set JAVA_MEM_OPTS=-Xms128m -Xmx512m
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| @Component @Slf4j public class IncSyncUserToEs {
@Resource private UserEsDao userEsDao;
private CanalConnector connector;
@PostConstruct public void start() { connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); new Thread(this::run).start(); }
public void run() { int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe("my_db.user"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; if (emptyCount > 3600) { emptyCount = 0; log.info("empty count : {}", emptyCount); } try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } else { emptyCount = 0; processEntries(message.getEntries()); }
connector.ack(batchId); } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } }
private void processEntries(List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; }
RowChange rowChange; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); }
EventType eventType = rowChange.getEventType(); for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.INSERT) { syncToEs(rowData.getAfterColumnsList(), "insert"); } else if (eventType == EventType.UPDATE) { syncToEs(rowData.getAfterColumnsList(), "update"); } else if (eventType == EventType.DELETE) { syncToEs(rowData.getBeforeColumnsList(), "delete"); } } } }
private void syncToEs(List<Column> columns, String eventType) { if ("insert".equals(eventType) || "update".equals(eventType)) { Map<String, Object> columnMap = getColumnMap(columns); UserEsDTO userEsDTO = BeanUtil.copyProperties(columnMap, UserEsDTO.class); log.info( "{}前的数据总数: {}", eventType, userEsDao.count()); userEsDao.save(userEsDTO); log.info( "{}后的数据总数: {}", eventType, userEsDao.count()); } else if ("delete".equals(eventType)) { long id = Long.parseLong(getColumnValue(columns, "id")); log.info( "{}前的数据总数: {}", eventType, userEsDao.count()); userEsDao.deleteById(id); log.info( "{}后的数据总数: {}", eventType, userEsDao.count()); } }
private Map<String, Object> getColumnMap(List<Column> columns) { Map<String, Object> map = new HashMap<>(); for (Column column : columns) { map.put(column.getName(), column.getValue()); } return map; }
private String getColumnValue(List<Column> columns, String columnName) { for (Column column : columns) { if (column.getName().equals(columnName)) { return column.getValue(); } } return null; }
@PreDestroy public void stop() { connector.disconnect(); } }
|
- 实现es快速搜索
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @Override public Page<User> searchFromEs(UserQueryRequest userQueryRequest) { Long id = userQueryRequest.getId(); String userName = userQueryRequest.getUserName(); String userProfile = userQueryRequest.getUserProfile(); long current = userQueryRequest.getCurrent() - 1; long pageSize = userQueryRequest.getPageSize(); String sortField = userQueryRequest.getSortField(); String sortOrder = userQueryRequest.getSortOrder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.filter(QueryBuilders.termQuery("isDelete", 0)); if (id != null) { boolQueryBuilder.filter(QueryBuilders.termQuery("id", id)); } if (StringUtils.isNotBlank(userName)) { boolQueryBuilder.should(QueryBuilders.matchQuery("userName", userName)); boolQueryBuilder.should(QueryBuilders.matchQuery("userProfile", userName)); boolQueryBuilder.minimumShouldMatch(1); } SortBuilder<?> sortBuilder = SortBuilders.scoreSort(); if (StringUtils.isNotBlank(sortField)) { sortBuilder = SortBuilders.fieldSort(sortField); sortBuilder.order(CommonConstant.SORT_ORDER_ASC.equals(sortOrder) ? SortOrder.ASC : SortOrder.DESC); } PageRequest pageRequest = PageRequest.of((int) current, (int) pageSize); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(boolQueryBuilder) .withPageable(pageRequest).withSorts(sortBuilder).build(); SearchHits<UserEsDTO> searchHits = elasticsearchRestTemplate.search(searchQuery, UserEsDTO.class); Page<User> page = new Page<>(); page.setTotal(searchHits.getTotalHits()); List<User> resourceList = new ArrayList<>(); if (searchHits.hasSearchHits()) { List<SearchHit<UserEsDTO>> searchHitList = searchHits.getSearchHits(); List<Long> userIdList = searchHitList.stream().map(searchHit -> searchHit.getContent().getId()) .collect(Collectors.toList()); List<User> userList = baseMapper.selectBatchIds(userIdList); if (userList != null) { Map<Long, List<User>> idUserMap = userList.stream().collect(Collectors.groupingBy(User::getId)); userIdList.forEach(userId -> { if (idUserMap.containsKey(userId)) { resourceList.add(idUserMap.get(userId).get(0)); } else { String delete = elasticsearchRestTemplate.delete(String.valueOf(userId), UserEsDTO.class); log.info("delete user {}", delete); } }); } } page.setRecords(resourceList); return page; }
|
关键词高亮
在es搜索中增加高亮配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field("userName"); highlightBuilder.field("userProfile");
highlightBuilder.preTags("<span style=\"background: yellow;\">"); highlightBuilder.postTags("</span>");
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder() .withQuery(boolQueryBuilder) .withPageable(pageRequest) .withSorts(sortBuilder) .withHighlightBuilder(highlightBuilder) .build();
|
将高亮字段设置到返回数据中
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| User user = idUserMap.get(userId).get(0);
searchHits.forEach(searchHit -> { if (searchHit.getContent().getId().equals(userId)) { Map<String, List<String>> highlightFields = searchHit.getHighlightFields(); if (highlightFields.containsKey("userName")) { user.setUserName(String.join("", highlightFields.get("userName"))); } if (highlightFields.containsKey("userProfile")) { user.setUserProfile(String.join("", highlightFields.get("userProfile"))); } } }); resourceList.add(user);
|
例子: 返回值如下 搜索关键词 “旅游”
压力测试
1s内100个线程
1 2 3 4
| { "searchText": "鱼皮", "type": "post" }
|
下拉联想词
二次更新: 实现了先对搜索词进行分词再获取推荐词
用户部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| DELETE user_v1
GET user_v1/_mapping
PUT user_v1 { "aliases": { "user": {} }, "mappings": { "properties": { "userName": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 64 }, "suggest": { "type": "completion", "analyzer": "ik_max_word" } } }, "userProfile": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 }, "suggest": { "type": "completion", "analyzer": "ik_max_word" } } }, "userAvatar": { "type": "keyword" }, "createTime": { "type": "date" }, "updateTime": { "type": "date" }, "isDelete": { "type": "keyword" } } } }
|
- 定义接口实现搜索建议方法 : CustomUserEsRepository
1 2 3
| public interface CustomUserEsRepository { List<String> suggest(String prefix); }
|
- 实现接口 : CustomUserEsRepositoryImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class CustomUserEsRepositoryImpl implements CustomUserEsRepository { @Autowired private RestHighLevelClient client;
@Override public List<String> suggest(String prefix) { Set<String> suggestions = new HashSet<>(); suggestions.addAll(suggestField("userName.suggest", prefix)); suggestions.addAll(suggestField("userProfile.suggest", prefix)); return new ArrayList<>(suggestions); }
private List<String> suggestField(String field, String prefix) { List<String> suggestions = new ArrayList<>(); try { SearchRequest searchRequest = new SearchRequest("user_v1"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SuggestBuilder suggestBuilder = new SuggestBuilder(); CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(field).prefix(prefix).size(10); suggestBuilder.addSuggestion("suggest_" + field, completionSuggestionBuilder); searchSourceBuilder.suggest(suggestBuilder); searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); searchResponse.getSuggest().getSuggestion("suggest_" + field) .getEntries().forEach(entry -> { entry.getOptions().forEach(option -> { suggestions.add(option.getText().string()); }); }); } catch (IOException e) { e.printStackTrace(); } return suggestions; } }
|
帖子部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| DELETE post_v1
GET post_v1/_mapping
PUT post_v1 { "aliases": { "post": {} }, "mappings": { "properties": { "title": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 }, "suggest": { "type": "completion", "analyzer": "ik_max_word" } } }, "content": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "tags": { "type": "keyword" }, "userId": { "type": "keyword" }, "createTime": { "type": "date" }, "updateTime": { "type": "date" }, "isDelete": { "type": "keyword" } } } }
|
- 定义接口实现搜索建议方法 : CustomPostEsRepository
1 2 3
| public interface CustomPostEsRepository { List<String> suggest(String prefix); }
|
- 实现接口 : CustomPostEsRepositoryImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class CustomPostEsRepositoryImpl implements CustomPostEsRepository { @Autowired private RestHighLevelClient client;
@Override public List<String> suggest(String prefix) { Set<String> suggestions = new HashSet<>(); suggestions.addAll(suggestField("title.suggest", prefix)); return new ArrayList<>(suggestions); }
private List<String> suggestField(String field, String prefix) { List<String> suggestions = new ArrayList<>(); try { SearchRequest searchRequest = new SearchRequest("user_v1"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SuggestBuilder suggestBuilder = new SuggestBuilder(); CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(field).prefix(prefix).size(10); suggestBuilder.addSuggestion("suggest_" + field, completionSuggestionBuilder); searchSourceBuilder.suggest(suggestBuilder); searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); searchResponse.getSuggest().getSuggestion("suggest_" + field) .getEntries().forEach(entry -> { entry.getOptions().forEach(option -> { suggestions.add(option.getText().string()); }); }); } catch (IOException e) { e.printStackTrace(); } return suggestions; } }
|
开发总结
WebMvcConfigurer和WebMvcConfigurationSupport(MVC配置)-CSDN博客
同时使用 WebMvcConfigurer
和 WebMvcConfigurationSupport
,WebMvcConfigurationSupport
会覆盖 Spring Boot 的自动配置,因此 WebMvcConfigurer
中的配置可能不会生效。建议只使用其中一种方式进行配置,以避免冲突和意外行为。
已实现
分页
下拉联想词提示
用户登录|注册|更新
待实现