CompletableFuture构建异步应用

Future接口的局限性

我们知道Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。
但是这些特性还不足以让你编写简洁的并发代码。
我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。
使用Future中提供的方法完成这样的操作又是另外一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些。

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  • 等待Future集合中的所有任务都完成。
  • 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
  • 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

店铺实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package world.ismyfree.async.demo01;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* @author :tanliyuan
* @date :Created in 2020/6/4 21:27
* @description :商店实体
* @version: 1.0
*/
public class Shop {
private Random random = new Random();
private String shopName;

public void setShopName(String shopName) {
this.shopName = shopName;
}

public String getShopName() {
return shopName;
}


public Shop(String shopName) {
this.shopName = shopName;
}


public double getPrice(String product) {
return calculatePrice(product);
}

public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

public static void delay() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

店铺测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package world.ismyfree.async.demo01;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

/**
* @author :tanliyuan
* @date :Created in 2020/6/5 10:32
* @description
* @version: 1.0
*/
public class ShopTest {
/**
* 商店列表
*/
public static List<Shop> shops = Arrays.asList(
new Shop("BestShop"),
new Shop("MyShop"),
new Shop("YourShop"),
new Shop("SuperShop")
);
/**
* 定制多线程执行器
*/
private static Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
/**
* CompletableFuture版本获取价格
*
* @param product
* @return
*/
private static List<String> getPricesCompletetableFuture(String product) {
List<CompletableFuture<String>> pricesFuture = shops.stream().map(
s -> CompletableFuture.supplyAsync(
() -> String.format("%s在%s的价格是%s", product, s.getShopName(), s.getPrice(product)),
executor
)
).collect(Collectors.toList());
System.out.println("异步对象已经返回,等待耗时任务计算完成");
return pricesFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
/**
* 使用顺序流获取商品的价格
*
* @param product
* @return
*/
private List<String> getPricesStream(String product) {
return shops.stream().map(s -> {
return s.getShopName() + "的价格是" + s.getPrice(product);
}).collect(Collectors.toList());
}
/**
* 使用并发流获取商品的价格
*
* @param product
* @return
*/
private List<String> getPricesParallelStream(String product) {
return shops.parallelStream().map(s -> {
return s.getShopName() + "的价格是" + s.getPrice(product);
}).collect(Collectors.toList());
}
/**
* 做一些有意义的事情
*/
private static void doSomething() {
System.out.println("做一些其他有意义的事");
}

public static void main(String[] args) {
long start = System.currentTimeMillis();
//System.out.println(getPricesStream("Apples"));
//System.out.println(getPricesParallelStream("Apples"));
System.out.println(getPricesCompletetableFuture("Apples"));
long invocationTime = System.currentTimeMillis() - start;
System.out.printf("异步对象返回时间%d毫秒%n", invocationTime);
doSomething();
long retrievalTime = System.currentTimeMillis() - start;
System.out.printf("计算结果返回时间%d毫秒%n", retrievalTime);
}
}

持续更新中》》》》》

如果本文对你有所帮助,请赏我1个铜板买喵粮自己吃,您的支持是我最大的动力!!!