coldsmog开发笔记
JS 事件笔记
Ckeditor 上传WPS图片失败问题
Springboot
SpringBoot 统一异常处理
Springboot 引入外部jar包
Springboot 打成war包
Springboot 多环境配置
SpringBoot @Scope注解学习
Springboot 快速生成项目API文档
SpringCache 缓存
Spring jetcache 二级缓存
Springboot 按条件装配类
FastJson的JsonPath语法
正则表达式语法
Spring 路径匹配
Feign 基础数据操作
监控Feign调用metrics
Springboot feign的性能优化
Jackson 设置序列化属性
SpringBoot 集成 Spring Data Mongodb 操作 MongoDB
MongoDB 的一些注意事项
MongoDB 指令对比
Jackson 解析XML
Springboot Redis注册
SpringBoot RedisTemplate批量插入
Springboot 指标监控Micrometer
springboot validation 注解校验
springboot 引入配置
Springboot 静态文件处理
Springboot 导出csv文件
Springboot 事件驱动(发布/订阅模式)
Springboot 启动过程和扩展点
Springboot 优化停机上下线
Spring自动装配 - 干饭角度学习
Springboot ShardingJDBC
Springboot的重试
springboot 动态修改端口
Oracle
Oracle 中实现自增ID
Oracle 定时任务
Oracle 解锁临时表
Oracle 检查连接数
Oracle 表空间
Oracle 解释执行SQL
markdown作图(适用typora)
服务器压测
业务对象层和数据层
并发限流处理
中间件
Yarn的使用
Dubbo学习笔记-RPC扩展和本地Mock
Dubbo学习笔记-泛化实现进行mock
Redis缓存穿透,缓存击穿,缓存雪崩
Galera 集群说明
Pip 镜像
pip 使用
MySQL命令行
数据库缓存双写方案
Git相关操作
Redis 操作时间复杂度一览
nacos 杂记
mybatis 散记
shardingjdbc
一次线上事故排查发现的Caffeine缓存死锁问题
设计模式
重新讲讲单例模式和几种实现
更优雅地实现策略模式
Http-headers
Prometheus 杂散笔记
JAVA 散记
CompletableFuture
Gson、FastJson、Jackson、json-lib对比总结
jackson 时间的序列化踩坑
JVM
自定义注解
mysql类型和java类型 转换一览表
枚举维护一个Map<value, Enum>的映射
Java中String +、concat、StringBuilder、StringBuffer 性能对比
TraceId 使用
MySQL 多数据源处理
Mybatis-plus 流式查询
JAVA发送win 桌面通知
idea 启动项目失败非代码问题杂记
Lambda 简述
Arthas 使用笔记
一种链式更新数据的数据模式
Skywalking 新增中间件插件
Redission 使用
数据导出为图片
IDEA 的热重启
Netty 工具类
maven 插件
本文档使用 MrDoc 发布
-
+
首页
CompletableFuture
## 什么是CompletableFuture? CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过 回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。 -------------------------- ## 简单入门 ### 线程的通信 ``` // 主线程 CompletableFuture<String> completableFuture=new CompletableFuture<String>(); Runnable runnable=new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println(getThreadName()+"执行子线程....."); //在子线程中完成主线程completableFuture的完成 completableFuture.complete("success"); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 子线程 Thread t1=new Thread(runnable); t1.start(); //主线程阻塞,等待完成后拿到值 String result=completableFuture.get(); System.out.println(getThreadName()+"1x: "+result); ``` ### 无返回值的异步任务 ``` CompletableFuture<Void> future=CompletableFuture.runAsync(new Runnable() { @Override public void run() { try { System.out.println(getThreadName()+"正在执行一个没有返回值的异步任务。"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }); future.get(); System.out.println(getThreadName()+" end"); ``` ### 有返回值的异步任务 ``` CompletableFuture<String> future=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { System.out.println(getThreadName()+"正在执行一个有返回值的异步任务。"); TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "OK"; } }); String result = future.get(); System.out.println(getThreadName()+" ans = " + result); ``` --------------------------- ## 高级进阶 ### 带返回值的异步callback调用 `thenApply`返回一个新的 CompletionStage,当这个阶段正常完成时,将这个阶段的结果传给函数作为参数执行。 ``` CompletableFuture<String> init=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println(getThreadName() + "supplyAsync"); return "123"; } }); CompletableFuture<Integer> second = init.thenApply(number->{ System.out.println(getThreadName()+"thenApply1"); return Integer.parseInt(number); }); CompletableFuture<Integer> third = second.thenApply(number->{ System.out.println(getThreadName()+"thenApply2"); return number*2; }); System.out.println(getThreadName() + " => " + third.get()); ``` ### 不带返回值的异步callback调用 `thenAccept`本身不再返回任何值,适合用于多个callback函数的最后一步操作使用 ``` CompletableFuture<String> init=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { System.out.println(getThreadName() + "supplyAsync"); return "123"; } }); CompletableFuture<Integer> second = init.thenApply(number->{ System.out.println(getThreadName()+"thenApply1"); return Integer.parseInt(number); }); CompletableFuture<Integer> end = second.thenAccept(number->{ System.out.println(getThreadName()+"thenApply2"); return number*2; }); end.get(); System.out.println(getThreadName() + " => END" ); ``` ### 不带返回值也不接受参数的尾处理 ``` CompletableFuture.supplyAsync(()->{ ystem.out.println(getThreadName()+"supplyAsync: 一阶段任务"); return null; }).thenRun(()->{ System.out.println(getThreadName()+"thenRun: 收尾任务"); }).get(); ``` ### 合并两个有依赖关系的任务 ``` CompletableFuture<String> future1=CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "1"; } }); CompletableFuture<String>nestedResult = future1.thenCompose(value-> CompletableFuture.supplyAsync(()->{ return value+"2"; })); System.out.println(nestedResult.get()); ``` ### 合并两个没有依赖关系的任务 ``` CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public Double get() { return 1.5; } }); CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public Double get() { return 2.3; } }); CompletableFuture<Double> result= d1.thenCombine(d2, (number1,number2)->{ return number1+number2; }); System.out.println(result.get()); ``` ### 多任务都执行完毕[有数据] ``` CompletableFuture<Double> array[]=new CompletableFuture[3]; for ( int i = 0; i < 3; i++) { array[i]=CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return Math.random(); } }); } //获取结果的方式一 // CompletableFuture.allOf(array).get(); // for(CompletableFuture<Double> cf:array){ // if(cf.get()>0.6){ // System.out.println(cf.get()); // } // } //获取结果的方式二,过滤大于指定数字,在收集输出 List<Double> rs= Stream.of(array).map(CompletableFuture::join).filter(number->number>0.6).collect(Collectors.toList()); System.out.println(rs); ``` ### 多任务都执行完毕[无数据] ``` List<String> line = new ArrayList(); List<CompletableFuture<>> totalLines = lines.stream() .map(productLine -> CompletableFuture.supplyAsync(() -> { //do something async })) .collect(Collectors.toList()); //线程聚合等待 CompletableFuture.allOf(totalLines.toArray(new CompletableFuture[0])).join(); ``` ### 多任务只要完成一个 ``` CompletableFuture<Double> array[]=new CompletableFuture[3]; for ( int i = 0; i < 3; i++) { array[i]=CompletableFuture.supplyAsync(new Supplier<Double>() { @Override public Double get() { return i; } }); } CompletableFuture<Object> result= CompletableFuture.anyOf(array); System.out.println(result.get()); ``` ---------------- ## 其余说明 ### 异常处理 ``` CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public Double get() { return 1.5; } }).exceptionally(ex->{ System.out.println(ex.getMessage()); return "发生 异常"+ex.getMessage(); }); System.out.println(task.get()); ``` ### 拦截所有正常异常返回值 ``` CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> "name") .handle((res, ex) -> { System.out.println(ex); return res; } ); System.out.println(task.get()); ``` ### 加入线程池 ``` @Configuration @Slf4j @EnableAsync public class ExecutorConfig { @Bean public Executor asyncExecutor(TaskExecutionProperties properties) { log.info("start async executor"); ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); // 配置核心线程数 threadPoolTaskExecutor.setCorePoolSize(properties.getPool().getCoreSize()); // 配置最大线程数 threadPoolTaskExecutor.setMaxPoolSize(properties.getPool().getMaxSize()); // 配置队列大小 threadPoolTaskExecutor.setQueueCapacity(ThreadPoolConstant.QUEUE_CAPACITY); // 配置线程池中线程的名称前缀 threadPoolTaskExecutor.setThreadNamePrefix(ThreadPoolConstant.THREAD_NAME_PREFIX); // HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务: // CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行; // AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常; // DiscardPolicy:丢弃当前将要加入队列的任务; // DiscardOldestPolicy:丢弃任务队列中最旧的任务; threadPoolTaskExecutor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } } 作者:arkMon 链接:https://juejin.cn/post/6914962224029106190 来源:稀土掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 ``` 有余留接口,传入即可 ``` CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor); CompletableFuture.runAsync(Runnable runnable, Executor executor); ``` ## 参考来源 https://cloud.tencent.com/developer/article/1366581
寒烟濡雨
2022年10月23日 18:53
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
关于 MrDoc
觅思文档MrDoc
是
州的先生
开发并开源的在线文档系统,其适合作为个人和小型团队的云笔记、文档和知识库管理工具。
如果觅思文档给你或你的团队带来了帮助,欢迎对作者进行一些打赏捐助,这将有力支持作者持续投入精力更新和维护觅思文档,感谢你的捐助!
>>>捐助鸣谢列表
微信
支付宝
QQ
PayPal
Markdown文件
分享
链接
类型
密码
更新密码