Spring5 框架新功能(Webflux)

1、SpringWebflux 介绍

2、响应式编程(Java 实现)

3、响应式编程(Reactor 实现)

1、SpringWebflux 介绍

(1)SpringWebflux 是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似,Webflux 使用当前一种比较流行的响应式编程出现的框架。

(2)使用传统 web 框架,比如 SpringMVC,是基于 Servlet 容器,而Webflux 是一种异步非阻
塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关 API 实现
* 异步和同步
* 非阻塞和阻塞
** 上面都是针对对象不一样
** 异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同
** 阻塞和非阻塞针对被调用者,被调用者受到请求之后,做完请求任务之后才给出反馈就是阻
(4)Webflux 特点
第一 非阻塞式:在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程
第二 函数式编程:Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求
(5)比较 SpringMVC
第一 两个框架都可以使用注解方式,都运行在 Tomcat 等容器中
第二 SpringMVC 采用命令式编程,Webflux 采用异步响应式编程

2、响应式编程(Java 实现)

(2)Java8 及其之前版本
* 提供的观察者模式两个类 Observer 和 Observable
public class ObserverDemo extends Observable {

    public static void main(String[] args) {
        ObserverDemo observer = new ObserverDemo();
        observer.addObserver((o, arg) ->{

        observer.addObserver((o, arg) -> {



java9中,Observer 和 Observable被Flow替代,而Flow是真正的响应式编程

public final class Flow {

    private Flow() {} // uninstantiable

     * A producer of items (and related control messages) received by
     * Subscribers.  Each current {@link Subscriber} receives the same
     * items (via method {@code onNext}) in the same order, unless
     * drops or errors are encountered. If a Publisher encounters an
     * error that does not allow items to be issued to a Subscriber,
     * that Subscriber receives {@code onError}, and then receives no
     * further messages.  Otherwise, when it is known that no further
     * messages will be issued to it, a subscriber receives {@code
     * onComplete}.  Publishers ensure that Subscriber method
     * invocations for each subscription are strictly ordered in <a
     * href="package-summary.html#MemoryVisibility"><i>happens-before</i></a>
     * order.
     * <p>Publishers may vary in policy about whether drops (failures
     * to issue an item because of resource limitations) are treated
     * as unrecoverable errors.  Publishers may also vary about
     * whether Subscribers receive items that were produced or
     * available before they subscribed.
     * @param <T> the published item type
    public static interface Publisher<T> {
         * Adds the given Subscriber if possible.  If already
         * subscribed, or the attempt to subscribe fails due to policy
         * violations or errors, the Subscriber's {@code onError}
         * method is invoked with an {@link IllegalStateException}.
         * Otherwise, the Subscriber's {@code onSubscribe} method is
         * invoked with a new {@link Subscription}.  Subscribers may
         * enable receiving items by invoking the {@code request}
         * method of this Subscription, and may unsubscribe by
         * invoking its {@code cancel} method.
         * @param subscriber the subscriber
         * @throws NullPointerException if subscriber is null
        public void subscribe(Subscriber<? super T> subscriber);

     * A receiver of messages.  The methods in this interface are
     * invoked in strict sequential order for each {@link
     * Subscription}.
     * @param <T> the subscribed item type
    public static interface Subscriber<T> {
         * Method invoked prior to invoking any other Subscriber
         * methods for the given Subscription. If this method throws
         * an exception, resulting behavior is not guaranteed, but may
         * cause the Subscription not to be established or to be cancelled.
         * <p>Typically, implementations of this method invoke {@code
         * subscription.request} to enable receiving items.
         * @param subscription a new subscription
        public void onSubscribe(Subscription subscription);

         * Method invoked with a Subscription's next item.  If this
         * method throws an exception, resulting behavior is not
         * guaranteed, but may cause the Subscription to be cancelled.
         * @param item the item
        public void onNext(T item);

         * Method invoked upon an unrecoverable error encountered by a
         * Publisher or Subscription, after which no other Subscriber
         * methods are invoked by the Subscription.  If this method
         * itself throws an exception, resulting behavior is
         * undefined.
         * @param throwable the exception
        public void onError(Throwable throwable);

         * Method invoked when it is known that no additional
         * Subscriber method invocations will occur for a Subscription
         * that is not already terminated by error, after which no
         * other Subscriber methods are invoked by the Subscription.
         * If this method throws an exception, resulting behavior is
         * undefined.
        public void onComplete();

     * Message control linking a {@link Publisher} and {@link
     * Subscriber}.  Subscribers receive items only when requested,
     * and may cancel at any time. The methods in this interface are
     * intended to be invoked only by their Subscribers; usages in
     * other contexts have undefined effects.
    public static interface Subscription {
         * Adds the given number {@code n} of items to the current
         * unfulfilled demand for this subscription.  If {@code n} is
         * less than or equal to zero, the Subscriber will receive an
         * {@code onError} signal with an {@link
         * IllegalArgumentException} argument.  Otherwise, the
         * Subscriber will receive up to {@code n} additional {@code
         * onNext} invocations (or fewer if terminated).
         * @param n the increment of demand; a value of {@code
         * Long.MAX_VALUE} may be considered as effectively unbounded
        public void request(long n);

         * Causes the Subscriber to (eventually) stop receiving
         * messages.  Implementation is best-effort -- additional
         * messages may be received after invoking this method.
         * A cancelled subscription need not ever receive an
         * {@code onComplete} or {@code onError} signal.
        public void cancel();

     * A component that acts as both a Subscriber and Publisher.
     * @param <T> the subscribed item type
     * @param <R> the published item type
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {

    static final int DEFAULT_BUFFER_SIZE = 256;

     * Returns a default value for Publisher or Subscriber buffering,
     * that may be used in the absence of other constraints.
     * @implNote
     * The current value returned is 256.
     * @return the buffer size value
    public static int defaultBufferSize() {
        return DEFAULT_BUFFER_SIZE;

public class Main {

    public static void main(String[] args) {
        Flow.Publisher<String> publisher = subscriber -> {
            subscriber.onNext("1"); // 1
            subscriber.onError(new RuntimeException("出错")); // 2
            //  subscriber.onComplete();

        publisher.subscribe(new Flow.Subscriber<>() {
            public void onSubscribe(Flow.Subscription subscription) {

            public void onNext(String item) {

            public void onError(Throwable throwable) {

            public void onComplete() {
                System.out.println("publish complete");

3、响应式编程(Reactor 实现)

(1)响应式编程操作中,Reactor 是满足 Reactive 规范框架
(2)Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作
符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
(3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉


 (4)代码演示 Flux 和 Mono

第一步 引入依赖
第二步 编程代码
public class TestReator {

    public static void main(String[] args) {
        //just 方法直接声明
        Integer[] array = {1, 2, 3, 4};

        List<Integer> list = Arrays.asList(array);
        Stream<Integer> stream = list.stream();
* 错误信号和完成信号都是终止信号,不能共存的
* 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
* 如果没有错误信号,没有完成信号,表示是无限数据流
(6)调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触

* 对数据流进行一道道操作,成为操作符,比如工厂流水线
第一 map 元素映射为新元素
第二 flatMap 元素映射为流
* 把每个元素转换流,把转换之后多个流合并大的流

