CompletableFuture实现多线程查询

Published on
46 3~4 min
@Override
public PageInfo<Entity> queryTest(BasePage queryBo) {

    //构建存放并行CompletableFuture实例的集合
    ArrayList<CompletableFuture<List<Entity>>> futures = new ArrayList<>();

    //业务查询
    PageHelper.startPage(queryBo.getPageNum(), queryBo.getPageSize());
    List<Entity> list = testMapper.selectTestByCondition(queryBo);

    //流处理会导致分页字段失效,提前构建分页
    PageInfo<DgRiskStatisticV2VO> pageInfo = new PageInfo<>(list);
	
    //批量查询并行数
    int batchSize = 1;
    
    //循环分割集合并开启并行线程处理子业务查询
	for (int i = 0; i < list.size(); i = i + batchSize) {
        int endIndex = Math.min(i + batchSize, list.size());
        List<Entity> subList = list.subList(i, endIndex);
        CompletableFuture<List<Entity>> future = CompletableFuture.supplyAsync(() -> performQuery(subList));
        futures.add(future);
    }

    //使用allOf方法等待所有查询完成
    CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

    //所有查询完毕,合并所有并行线程结果
    CompletableFuture<List<Entity>> finalFutures = allFutures.thenApply(v -> futures.stream().map(CompletableFuture::join).flatMap(List::stream).collect(Collectors.toList()));

    //取出所有查询结果并返回
    List<Entity> result = finalFutures.join();
    pageInfo.setList(result);
    return pageInfo;
}
/**
 * 并行子查询
 *
 * @param parentList
 * @return newList
 */
private List<Entity> performQuery(List<Entity> parentList) {
    parentList.forEach(x -> {
       //子业务处理或查询
		...
    });
    return newList;
}