博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Akka2使用探索6(Futures)——实现并发和异步
阅读量:6293 次
发布时间:2019-06-22

本文共 16365 字,大约阅读时间需要 54 分钟。

hot3.png

Future用来获取某个并发操作的结果,这个结果可以同步(阻塞)或异步(非阻塞)的方式访问。

 

执行上下文

Future 需要一个ExecutionContext, 它与java.util.concurrent.Executor 很相像. 如果你在作用域内有一个 ActorSystem , 它可以用system.dispatcher()作 ExecutionContext。你也可以用ExecutionContext 伴生对象提供的工厂方法来将 ExecutorsExecutorServices 进行包裹, 或者甚至创建自己的实例.

//执行上下文可以自己指定线程池类型        ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool())
Future的创建方法        Future
f1 = Futures.successful("f1", ec); Future
f2 = Futures.future(new Callable() { @Override Object call() { return "f2" } }, ec) Future
f3 = Futures.successful("f3", ActorSystem.create("test").dispatcher());
//使用actor的ask方法发送消息是也能创建一个Future
Future f4 = akka.pattern.Patterns.ask(actor, "msg", 1000 * 60)
函数式 Future
Akka 的 Future 有一些与Scala集合所使用的非常相似的 monadic 方法. 这使你可以构造出结果可以传递的‘管道’ 或 ‘数据流’.

map(对未来返回的结果进行处理)

Future以函数式风格工作的第一个方法是 map. 它需要一个函数来对Future的结果进行处理, 返回一个新的结果。map 方法的返回值是包含新结果的另一个 Future:

static void map() throws Exception {        ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Future
f1 = Futures.successful("fof1o", ec); //map的作用是:对Futrue:f1的返回结果进行处理,返回一个新的结果 Future
f2 = f1.map(new Mapper
() { public Integer apply(String s) { return s.length(); } }); //这里对未来f1返回的字符串计算其长度
//下面是非阻塞式,异步返回        f2.onComplete(new OnComplete
() { @Override public void onComplete(Throwable failure, Integer success) { System.out.println("f2返回结果:" + success + ",failure:" + failure); } }); f2.onSuccess(new OnSuccess
() { @Override public void onSuccess(Integer result) { System.out.println("f2返回结果:" + result); } }); f2.onFailure(new OnFailure() { @Override public void onFailure(Throwable failure) { System.out.println("f2返回failure:" + failure); } }); }

flatMap(对多个Future返回的结果进行处理)

static void flatMap() throws Exception {        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Future
f1 = Futures.successful("hello", ec); // Future
fr = f1.flatMap(new Mapper
>() { public Future
apply(final String s) { return Futures.future(new Callable
() { public Integer call() { return s.length(); } }, ec); } }); // System.out.println(Await.result(fr, Duration.create(5, "s"))); //阻塞式 } //对两个Future的结果处理 static void flatMap_concat2() throws Exception { final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); final Future
f1 = Futures.successful("hello", ec); final Future
f2 = Futures.successful("world", ec); //如果要对多个Future的结果进行处理,需要用flatMap //本例中对f1和f2返回的结果用空格连接成“hello world” Future
fr = f1.flatMap(new Mapper
>() { public Future
apply(final String s) { return f2.map(new Mapper
() { @Override public String apply(String v) { return s + " " + v; } }); } }); System.out.println(Await.result(fr, Duration.create(5, "s"))); //阻塞式 } //对三个Future的结果处理 static void flatMap_concat3() throws Exception { final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); final Future
f1 = Futures.successful("How", ec); final Future
f2 = Futures.successful("are", ec); final Future
f3 = Futures.successful("you", ec); //如果要对多个Future的结果进行处理,需要用flatMap //本例中对f1、f2、f3返回的结果用空格连接成“How are you” Future
fr = f1.flatMap(new Mapper
>() { public Future
apply(final String s) { return f2.flatMap(new Mapper
>() { @Override public Future
apply(final String s2) { return f3.map(new Mapper
() { @Override public String apply(String s3) { return s + " " + s2 + " " + s3; } }); } }); } }); /*用java写比较繁琐,用scala的话就简单多了 val future1 = for { a: String <- actor ? "How" // returns How b: String <- actor ? "are" // returns "are" c: String <- actor ? "you" // returns "you" } yield a + " " + b + "" + c*/ System.out.println(Await.result(fr, Duration.create(5, "s"))); //阻塞式 }

filter(对Future进行条件筛选)

static void filter() throws Exception {        ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Future
f1 = Futures.successful("fof1o", ec); Future
f2 = Futures.successful("fo", ec); //map的作用是:对Futrue:f1的返回结果进行处理,返回一个新的结果 Future
fs = f1.filter(Filter.filterOf(new Function
() { @Override public Boolean apply(String param) { return param.length() == 5; } })); System.out.println(Await.result(fs, Duration.create(5, "s"))); Future
ff = f2.filter(Filter.filterOf(new Function
() { @Override public Boolean apply(String param) { return param.length() == 5; } })); //不匹配的话会抛scala.MatchError异常 System.out.println(Await.result(ff, Duration.create(5, "s"))); }

组合Futures

如果Future的数目较多,用flatMap组合的话代码就过于复杂。可以使用sequencetraverse。

sequence(将 T[Future[A]] 转换为 Future[T[A]])

public static void sequence() throws Exception {        //将 T[Future[A]] 转换为 Future[T[A]]        //简化了用flatMap来组合        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        final Future
f1 = Futures.successful("How", ec); final Future
f2 = Futures.successful("are", ec); final Future
f3 = Futures.successful("you", ec); List
> futureList = new ArrayList
>(); futureList.add(f1); futureList.add(f2); futureList.add(f3); //这里将List
> 组合成一个Future:Future
> Future
> future = Futures.sequence(futureList, ec); Future
fr = future.map(new Mapper
, String>() { @Override public String apply(Iterable
parameter) { String result = ""; for (String s : parameter) { result += s + " "; } return result; } }); System.out.println(Await.result(fr, Duration.create(5, "s"))); }
traverse(将 T[A] 转换为 Future[T[A]])
public static void traverse() throws Exception {        //将 T[A] 转换为 Future[T[A]]        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Iterable
list = Arrays.asList("How", "are", "you"); //这里将List
组合成一个Future:Future
> ,对list中的每个元素做加工处理 Future
> future = Futures.traverse(list, new Function
>() { @Override public Future
apply(final String param) { return Futures.future(new Callable
() { @Override public String call() throws Exception { return param.toUpperCase(); } }, ec); } }, ec); Future
fr = future.map(new Mapper
, String>() { @Override public String apply(Iterable
parameter) { String result = ""; for (String s : parameter) { result += s + " "; } return result; } }); System.out.println(Await.result(fr, Duration.create(5, "s"))); }
fold(从一个初始值开始递归地对Future序列进行处理(它将sequence和map操作合并成一步了))
public static void fold() throws Exception {        //fold从一个初始值开始递归地对Future序列进行处理(它将sequence和map操作合并成一步了)        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        final Future
f1 = Futures.successful("How", ec); final Future
f2 = Futures.successful("are", ec); final Future
f3 = Futures.successful("you", ec); List
> futureList = new ArrayList
>(); futureList.add(f1); futureList.add(f2); futureList.add(f3); //本例从初始值“Init”开始,递归地对futureList的返回值用"_"连接,返回“Init_How_are_you” Future
fr = Futures.fold("Init", futureList, new Function2
() { @Override public String apply(String arg1, String arg2) { System.out.println("arg1----" + arg1); //第一次为Init,第二次为Init_How ,第三次为Init_How_are System.out.println("arg2----" + arg2); //第一次为How ,第二次为are 第三次为you return arg1 + "_" + arg2; } }, ec); //如果futureList为空列表,则返回初始值“Init” System.out.println(Await.result(fr, Duration.create(5, "s"))); //结果为Init_How_are_you } reduce(如果不想从给定的初始值开始递归,而想从future序列的第一个开始,则用reduce(它将sequence和map合并成一步了))
public static void reduce() throws Exception {        //如果不想从给定的初始值开始递归,而想从future序列的第一个开始,则用reduce(它将sequence和map合并成一步了)        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        final Future
f1 = Futures.successful("How", ec); final Future
f2 = Futures.successful("are", ec); final Future
f3 = Futures.successful("you", ec); List
> futureList = new ArrayList
>(); futureList.add(f1); futureList.add(f2); futureList.add(f3); //本例从初始值“How”开始,递归地对futureList的返回值用"_"连接,返回“How_are_you” Future
fr = Futures.reduce(futureList, new Function2
() { @Override public String apply(String arg1, String arg2) { System.out.println("arg1----" + arg1); //第一次为How ,第二次为How_are System.out.println("arg2----" + arg2); //第一次为are ,第二次为you return arg1 + "_" + arg2; } }, ec); //如果futureList为空列表,则返回初始值“Init” System.out.println(Await.result(fr, Duration.create(5, "s"))); //结果为Init_How_are_you }

andThen(由于回调的执行是无序的,而且可能是并发执行的, 当你需要一组有序操作的时候需要一些技巧。)

public static void andThen() throws Exception {        //如果要对Future的结果分多次依次处理,需要使用andThen        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Future
future = Futures.successful("hello", ec).andThen(new OnComplete
() { @Override public void onComplete(Throwable failure, String success) { System.out.println("先收到:" + success); } }).andThen(new OnComplete
() { @Override public void onComplete(Throwable failure, String success) { System.out.println("又收到:" + success); } }).andThen(new OnSuccess
>() { @Override public void onSuccess(Either
result) { System.out.println("收到onSuccess:" + result); } }); }
fallbackTo(将两个 Futures 合并成一个新的 Future, 如果第一个Future失败了,它将持有第二个 Future 的成功值)
public static void fallbackTo() throws Exception {        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Future
f1 = Futures.failed(new RuntimeException("ex1"), ec); Future
f2 = Futures.failed(new RuntimeException("ex2"), ec); Future
f3 = Futures.successful("ok", ec); //fallbackTo 将两个 Futures 合并成一个新的 Future, 如果第一个Future失败了,它将持有第二个 Future 的成功值 Future
fr = f1.fallbackTo(f2).fallbackTo(f3); System.out.println(Await.result(fr, Duration.create(5, "s"))); }

zip(操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果)

public static void zip() throws Exception {        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        Future
f1 = Futures.future(new Callable
() { @Override public String call() throws Exception { System.out.println("f1---" + Thread.currentThread().getName()); Thread.sleep(1000 * 10); return "hello"; } }, ec); Future
f2 = Futures.future(new Callable
() { @Override public String call() throws Exception { System.out.println("f2---" + Thread.currentThread().getName()); Thread.sleep(1000 * 5); return "world"; } }, ec); //zip操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果 Future
fr = f1.zip(f2).map(new Mapper
, String>() { @Override public String apply(Tuple2
ziped) { System.out.println("zip---" + Thread.currentThread().getName()); return ziped._1() + " " + ziped._2(); //f1和f2的返回结果包含在zipped对象中 } }); System.out.println("主线程----" + Thread.currentThread().getName()); System.out.println(Await.result(fr, Duration.create(15, "s"))); } public static void zip2() throws Exception { final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool()); Future
f1 = Futures.successful("hello", ec); Future
f2 = Futures.future(new Callable
() { @Override public String call() throws Exception { System.out.println("f2---" + Thread.currentThread().getName()); Thread.sleep(1000 * 10); return (1 / 0) + ""; } }, ec); //zip操作将两个 Futures 组合压缩成一个新的Future,返回的新的Future持hold一个tuple实例,它包含二者成功的结果 Future
fr = f1.zip(f2).map(new Mapper
, String>() { @Override public String apply(Tuple2
ziped) { System.out.println("zip----" + Thread.currentThread().getName()); return ziped._1() + " " + ziped._2(); //f1和f2的返回结果包含在zipped对象中 } }); System.out.println("主线程----" + Thread.currentThread().getName()); System.out.println(Await.result(fr, Duration.create(15, "s"))); }

recover(对Future的异常进行处理,相当于try..catch中对捕获异常后的处理)

public static void recover() throws Exception {        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        //recover对Future的异常进行处理,相当于try..catch中对捕获异常后的处理        Future
future = Futures.future(new Callable
() { public Integer call() { return 1 / 0; } }, ec).recover(new Recover
() { public Integer recover(Throwable problem) throws Throwable { System.out.println("捕获到异常:" + problem);// if (problem instanceof RuntimeException) {
// return 0;// } else {
// throw problem;// } return -2; //这里捕获到异常后直接返回新值了,并没有再抛出异常,所以后面的recover不会再收到异常 } }).recover(new Recover
() { public Integer recover(Throwable problem) throws Throwable { System.out.println("捕获到异常:" + problem); if (problem instanceof ArithmeticException) { //捕获异常并处理,捕获到后,后面得到的result将会是-1 return -1; } else { throw problem; } } }); int result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); System.out.println("result----" + result); }

recoverWith(和recover很类似,只是捕获到异常后返回Future,使其能够异步并发处理)

public static void recoverWith() throws Exception {        final ExecutionContextExecutorService ec = ExecutionContexts.fromExecutorService(Executors.newCachedThreadPool());        //recoverWith和recover很类似,只是捕获到异常后返回Future,使其能够异步并发处理        Future
future = Futures.future(new Callable
() { public Integer call() { return 1 / 0; } }, ec).recoverWith(new Recover
>() { @Override public Future
recover(Throwable failure) throws Throwable { if (failure instanceof ArithmeticException) { return Futures.future(new Callable
() { @Override public Integer call() throws Exception { return 0; } }, ec); } else throw failure; } }); int result = Await.result(future, Duration.create(1, TimeUnit.SECONDS)); System.out.println("result----" + result); }

转载于:https://my.oschina.net/xiefeifeihu/blog/81117

你可能感兴趣的文章
从源码剖析useState的执行过程
查看>>
地包天如何矫正?
查看>>
中间件
查看>>
Android SharedPreferences
查看>>
css面试题
查看>>
Vue组建通信
查看>>
用CSS画一个带阴影的三角形
查看>>
前端Vue:函数式组件
查看>>
程鑫峰:1.26特朗.普力挺美元力挽狂澜,伦敦金行情分析
查看>>
safari下video标签无法播放视频的问题
查看>>
01 iOS中UISearchBar 如何更改背景颜色,如何去掉两条黑线
查看>>
对象的继承及对象相关内容探究
查看>>
Spring: IOC容器的实现
查看>>
Serverless五大优势,成本和规模不是最重要的,这点才是
查看>>
Nginx 极简入门教程!
查看>>
iOS BLE 开发小记[4] 如何实现 CoreBluetooth 后台运行模式
查看>>
Item 23 不要在代码中使用新的原生态类型(raw type)
查看>>
为网页添加留言功能
查看>>
JavaScript—数组(17)
查看>>
Android 密钥保护和 C/S 网络传输安全理论指南
查看>>