响应式编程是一个关注数据流和变化传播的异步编程范式。它使得通过所使用的编程语言来轻松的描述静态(e.g. array)和动态(e.g. event emitters)数据流成为可能。
Reactive Streams Specification定义了Java响应式编程的接口规范。这个规范已被整合进Java 9 (Flow类)。
在面向对象的编程语言中,响应式编程经常被做为观察者模式的扩展。
也有人拿响应式流与迭代器模式进行对比,因为在它们的类库中都有一个Iterable-Iterator对。它们最大的不同在于,迭代器采用的是pull模式,而响应式流采用的是push模式。
迭代器是命令式编程模式,由开发者决定何时调用next()方法来获取下一个元素;而响应式编程中国,与Iterable-Iterator对等价的是Publisher-Subscriber对,由Publisher通知Subscriber有新的元素要处理。
这个规范定义了如下接口:
org.reactivestreams.Publisher: 无界元素序列的提供者,根据Subscriber发布这些元素。一个Publisher可以服务于多个Subscriber
org.reactivestreams.Subscriber: 无界元素序列的消费者
org.reactivestreams.Subscription: 一个订阅到Publisher的Subscriber的一对一的生命周期
org.reactivestreams.Processor: Publisher和Subscriber都要尊从的契约
Publisher/Subscriber: 1/N
Subscriber/Publisher: 1/1
Subscription/Subscriber: 1/1
Reactor是一个在JVM平台上构建非阻塞应用程序(non-blocking application)的Reactive Streams Specification的实现。
它是Spring WebFlux的Reactor默认实现,其对外提供的主要类有Flux和Mono:
Flux<T>: public abstract class Flux<T> implements Publisher<T>, 0到N个元素的响应式流Mono<T>: public abstract class Mono<T> implements Publisher<T>, 0到1个元素的响应式流引入依赖:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.6.RELEASE</version>
</dependency>
最简单的构建方式:
//从数组构建
Flux.just("0", "1", "2", "3")
.subscribe(s -> log.info("Flux.just: {}", s));
Flux.fromArray(new String[]{"0", "1", "2", "3"})
.subscribe(s -> log.info("Flux.fromArray: {}", s));
List<String> strings = Arrays.asList("0", "1", "2", "3");
//从可迭代对象构建,如ArrayList
Flux.fromIterable(strings)
.subscribe(s -> log.info("Flux.fromIterable: {}", s));
//从Java 8 的Stream构建
Flux.fromStream(strings.stream())
.subscribe(s -> log.info("Flux.fromStream: {}", s));
//构建至多一个元素的流
Mono.just("0")
.subscribe(s -> log.info("Mono.just: {}", s));
String value = "0";
Mono.justOrEmpty(Optional.ofNullable(value))
.subscribe(s -> log.info("Mono.justOrEmpty: {}", s));
构建一个数字序列:
//构建一个排列
Flux.range(0, 4)
.subscribe(i -> log.info("Flux.range: {}", i));
//这个for循环实现了上面代码的功能
for (int i = 0; i < 4; i++) {
log.info("for: {}", i);
}
为序列分配线程执行,而不是在调用者线程中执行,即异步执行。
Reactor提供了如下Scheduler:
Schedulers.immediate(): 在当前线程执行Schedulers.single(): 可重用的单线程Schedulers.newSingle(): 每次调用都创建一个新线程Schedulers.elastic(): 在需要时创建线程池,重用空闲的线程,适用于I/O等阻塞式工作,类似Executors.newCachedThreadPool()Schedulers.parallel(): 固定大小(CPU核心数)的线程池为每个Flux分配一个线程
Flux.range(0, 10)
.publishOn(Schedulers.parallel())
.subscribe(i -> log.info("Flux.range: {}", i));
Flux.range(10, 10)
.subscribeOn(Schedulers.parallel())
.subscribe(i -> log.info("Flux.range: {}", i));
publishOn/subscribeOn: 都会使你的操作在单独的线程中执行
parallel(int)和runOn(Scheduler)方法帮助你真正的进行异步处理:
parallel(int)方法返回一个ParallelFlux实例,runOn(Scheduler)方法告诉ParallelFlux实例使用哪个Scheduler来执行任务。
Flux.range(0, 100)
.parallel(4)
.runOn(Schedulers.parallel())
.subscribe(i -> log.info("Flux.range: {}", i));
通过ConnectableFlux,可以向多个Subscriber发送广播
方案一:
publish()-->connect():
ConnectableFlux<Integer> flux = Flux.range(0, 4)
.doOnSubscribe(subscription -> log.info("doOnSubscribe: {}", subscription))
.publish();
flux.subscribe(i -> log.info("subscribe 1: {}", i));
flux.subscribe(i -> log.info("subscribe 2: {}", i));
flux.connect();
直到调用connect()方法,才会开始处理数据,即调用了connect()方法才会执行subscribe()方法中的代码
方案二:
publish()-->autoConnect():
Flux<Integer> flux = Flux.range(0, 4)
.doOnSubscribe(subscription -> log.info("doOnSubscribe: {}", subscription))
.publish().autoConnect(2);
flux.subscribe(i -> log.info("subscribe 1: {}", i));
flux.subscribe(i -> log.info("subscribe 2: {}", i));
使用autoConnect(int)方法时,直到subscribe()被调用N(autoConnect(int)的参数)次才会开始执行任务,且最多有N个订阅者
RxJava是另一个Reactive Streams Specification的实现。
引入依赖:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.1.12</version>
</dependency>
Flowable<T>: public abstract class Flowable<T> implements Publisher<T>, 0到N个元素的响应式流,支持Reactive-Streams和backpressure构建与Reactor类似,也是rang、just、fromXXX几个方法
RxJava提供了如下Scheduler:
Schedulers.computation(): 使用固定数量的专门的线程异步执行计算密集型任务Schedulers.io(): 执行I/O等阻塞式任务Schedulers.single(): 使用单个线程以FIFO的方式执行任务Schedulers.trampoline(): 用于测试示例:
Flowable.range(0,10)
.subscribeOn(Schedulers.computation())
.subscribe(s -> log.info("Schedulers.computation: {}",s));
Flowable.range(10,10)
.observeOn(Schedulers.computation())
.subscribe(s -> log.info("Schedulers.computation: {}",s));
subscribeOn/observeOn: 与Reactor的publishOn/subscribeOn类似
方案一:
Flowable.range(0, 10)
.flatMap(v -> Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> {
Integer r = w * w;
log.info("parallel processing: {} * {} = {}", w, w, r);
return r;
}))
.subscribe(r -> log.info("parallel processing: {}", r));
方案二:
Flowable.range(0, 10)
.parallel(4)
.runOn(Schedulers.computation())
.map(w -> {
Integer r = w * w;
log.info("parallel processing: {} * {} = {}", w, w, r);
return r;
})
.sequential()
.subscribe(r -> log.info("parallel processing: {}", r));
parallel(int)方法返回一个ParallelFlowable<T>。需要注意的是,parallel(int)和ParallelFlowable<T>都被标记为beta(@Beta),在未来的RxJava版本中有可能会变动。