0%

Vertx和响应式编程

今天了解下听说了好久,但是没怎么用过的东西。

响应式介绍

可能有的同学对 Vertx 和响应式还不熟悉,按照惯例,先上定义。

响应式编程(Reactive Programming):

计算中,响应式编程反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

好,估计还是不懂,Wiki 上的定义一向晦涩难懂。

”响应式“这个概念其实是编程范式(Programming paradigm)里的“声明式编程”的子类,我们不妨对比一下,才能更好地理解什么才算是响应式编程。

  • 过程化/命令式编程:命令式编程的主要思想是关注计算机执行的步骤, 即一步一步告诉计算机先做什么再做什么。
  • 声明式编程:声明式编程是以数据结构的形式来表达程序执行的逻辑。 它的主要思想是告诉计算机应该做什么,但不指定具体要怎么做。

比如我们想在一个数字集合中筛选数字大于 6 的,对于命令式,可能是这么做:

1
for(int num=0;i<list.size();i++)
2
{
3
  if (num > 6){
4
    return num;
5
  }
6
}

而声明式编程中最典型的就是 SQL,我们想筛选同样的数据,对于 SQL 来说可能是下面这样:

1
SELECT num FROM list where num > 6 limit 1;

除了 SQL,网页编程中用到的 HTML 和 CSS 也都属于声明式编程。

命令式编程和声明式编程起源的不同决定这两大类范式代表着迥然不同的编程理念和风格:命令式编程是行动导向( Action-Oriented )的,因而算法是显性而目标是隐性的;声明式编程是目标驱动( Goal-Driven )的,因而目标是显性而算法是隐性的。

明确什么是“声明式”之后,我们在看什么叫”面向数据流(data stream)和变化传播(propagation of change)“。

与传统的处理方式相比,响应式能够基于数据流中的事件进行反应处理。举个例子:a+b=c的场景,在传统编程方式下如果 a、b 发生变化,那么我们需要重新计算 a+b 来得到 c 的新值。而反应式编程中,我们不需要重新计算,a、b 的变化事件会触发 c 的值自动更新。这种方式类似于我们在消息中间件中常见的发布/订阅模式或者观察者模式。由流发布事件,而我们的代码逻辑作为订阅方基于事件进行处理,并且是异步处理的。

观察者模式,或者可以叫发布-订阅模式,定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。

反应式编程中,最基本的处理单元是事件流(事件流是不可变的,对流进行操作只会返回新的流)中的事件。流中的事件包括正常事件(对象代表的数据、数据流结束标识)和异常事件(异常对象,例如 Exception)。同时,只有当订阅者第一次发布者,发布者发布的事件流才会被消费,后续的订阅者只能从订阅点开始消费,但是我们可以通过背压、流控等方式控制消费。

常用的反应式编程实现类库包括:Reactor、RxJava 2、Akka Streams、Vert.x 以及 Ratpack 等等。

Vert.x

一个基于 JVM、轻量级、高性能的应用平台。

介绍

相信任何一个程序猿,对”高性能“的追求永远都不会停歇。Vertx 既然自称高性能,那么有多高的性能?参见下面这个 Benchmark。

Web Framework Benchmarks

这个测试非常全,基本覆盖了常见语言的常见框架,并且分为多种场景,每隔半年左右重新测试一遍更新数据。可以看到 Vertx 在 JVM 语言里面性能排名还是非常靠前的。但是前几名还是还是 C++、Rust 的天下。

image-20210602092204587

作为对比,Spring 大概在这个位置:

image-20210602094525978

当然,Vertx 不是一个大而全的框架,和 Spring 的定位不一样也不好直接对比。

Vertx 有如下几个特性:

  • 轻量级,核心包大小只有 650kB。
  • 速度快,基于全异步的 Netty,Netty 性能很强大家应该都了解,Vertx 基于 Netty 做了很多封装,保持高性能的同时增加了易用性。
  • 同时支持多种编程语言,比如 Java,JavaScript,Groovy,Ruby,Ceylon,Scala 和 Kotlin。
  • 极好的分布式开发支持。
  • 模块化。

不讲这些比较虚的,我们直接来看下 Vertx 的理念和玩法。

官方文档的 Introduction 其实就有比较详细的介绍,我在根据我的理解再复述补充一遍:

最开始的多线程

解决并发问题最经典的用法就是用多线程,多个线程可以存在于单个进程中,执行并发工作,并共享相同的内存空间。

Threads

大多数应用程序和服务开发框架都是基于这种多线程模式。表面上看,每个连接有一个线程的模式是很让人放心,因为开发人员可以依靠传统的命令式代码。但是实际上你还是要自己去规避在内存访问方面的风险。

多线程简单但是受限

当工作负荷增长到超过中等工作负荷时会发生什么?(比如C10k问题)

答案很简单:你的操作系统内核开始受到影响,每次请求期间就有太多的上下文切换。

Blocking I/O

你的一些线程会被阻塞,因为它们在等待 I/O 操作的完成,一些线程会准备处理 I/O 结果,一些线程会在做 CPU 密集型任务。

现代内核有非常好的调度器,但你不能指望它们像处理 5000 个线程那样容易。另外,线程并不便宜:创建一个线程需要几毫秒,一个新的线程会吃掉大约 1MB 的内存。

使用异步来增加扩展性和效率

当你使用异步 I/O 时,用更少的线程处理更多的并发连接是可能的。当一个 I/O 操作发生时,我们不是阻塞一个线程,而是转到另一个准备好的任务上,并在以后准备好时恢复最初的任务。

Vertx 使用事件循环来复用并发的工作负载。

Event loop

在事件循环上运行的代码不应该执行阻塞的 I/O 或冗长的处理。但如果你有这样的代码,也不用担心。Vertx 有工人线程和 API,可以在事件循环上处理回事件。

自行挑选最好的异步编程模型

想要玩好异步编程需要更多的努力。在 Vertx 的核心模块,支持回调和 promises/futures,后者是一种简单而优雅的异步操作链模型。

使用 RxJava 可以实现高级的反应式编程,如果你喜欢更接近于传统的命令式编程,Vertx 还可以提供一流的 Kotlin coroutines 支持。

Asynchronous programming

Vertx 支持许多异步编程模型:方便为每个问题选择最有效的方法。

Vertx 组件

上面只是对 Vertx 的设计和理念的简单介绍。想要了解 Vertx 我们可能还得熟悉他的几个基本概念和组。

Event Loop

Event Loop 顾名思义,就是事件循环的。在 Vertx 的生命周期内,会不断的轮询查询事件。

刚才也介绍了,传统的多线程编程模型,每个请求就 fork 一个新的线程对请求进行处理。这样的编程模型有实现起来比较简单,一个连接对应一个线程。但是如果有大量的请求需要处理,就需要 fork 出大量的线程进行处理,对于操作系统来说调度大量线程造成系统负载升高。

img

Event Loop 不断的轮训,获取事件然后安排上不同的 Handler 处理对应的 Event。

这里要注意的是为了保证程序的正常运行,event 必须是非阻塞的。否则就会造成 Event Loop 的阻塞,影响 Vertx 的表现。但是现实中的程序肯定不能保证都是非阻塞的,Vertx 也提供了相应的处理阻塞的方法的机制。

Verticle

Verticle 是由 Vertx 部署和运行的代码块。默认情况一个 Vertx 实例维护了 N(默认情况下 N = CPU核数 x 2)个 Event Loop 线程。Verticle 实例可使用任意 Vertx 支持的编程语言编写,而且一个简单的应用程序也可以包含多种语言编写的 Verticle。

可以将 Verticle 想成 Actor Model 中的 Actor。一个应用程序通常是由在同一个 Vertx 实例中同时运行的许多 Verticle 实例组合而成。不同的 Verticle 实例通过向 Event Bus 收发送消息来相互通信。

不了解 Actor 模型的同学可以参考 Golang 社区的一句话:不要通过共享内存来通信,而应该通过通信来共享内存。Actor 模型也是类似的理念,互相之间通过传递消息来共享数据,并且数据处理是单线程的,就不涉及到锁和竞争。

verticle

Verticle 其实可以分为几种:

  • Stardand Verticle 是最常用的一类 Verticle —— 它们永远运行在 Event Loop 线程上。
  • Worker Verticle 会运行在 Worker Pool 中的线程上。一个实例绝对不会被多个线程同时执行。

Stardand Verticle 在创建时被分配了到 Event Loop 线程上,并且启动方法是通过 Event Loop 调用的。当你从 Event Loop 中调用任何其他需要处理核心 API 的方法时,Vertx 会保证这些程序,在同一个 Event Loop 中执行。

这意味着 Vertx 保证 verticle 实例中的所有代码总是在同一个事件循环中执行(只要你不创建自己的线程并调用它!)。

所以我们可以将所有代码写成单线程,让 Vertx 来担心线程和扩展的问题。不用担心同步和易失性的问题了,而且也避免了许多其他的竞赛条件和死锁的情况,这些情况传统多线程应用开发时非常普遍。

Worker Verticle 和 Stardand Verticle 一样,但它是使用 Vertx Worker 线程池的线程来执行的,而不是使用事件循环。

Worker Verticle 是为调用阻塞代码而设计的,因为它们不会阻塞任何 Event Loop。

如果你不想使用工作线程来运行阻塞代码,你也可以在 Event Loop 中直接运行内联阻塞代码。

Vertx 不会由一个以上的线程同时执行 Worker Verticle 实例,但可以由不同的线程在不同时间执行。

Event bus

上面说了 Vertical 互相之间需要通过消息来贡献数据,而 Event bus 就是承载消息的消息总线。可能类比后端常用的 MQ 就更加容易理解了。

Event bus

Vertx 的使用原则

使用 Vertx 的时候要遵循基本原则:

Don’t call us, we’ll call you.

Vertx 的 API 基本都是事件驱动的,这意味着当 Vertx 中发生你感兴趣的事情时,Vertx 会通过向你发送事件来调用你。

事件有可能是:

  • 定时器启动了
  • 一些数据已经到达套接字上
  • 一些数据被从磁盘上读取
  • 发生了一个异常
  • 一个 HTTP 服务器收到了一个请求

等等,比如一个处理 http 请求事件的写法:

1
server.requestHandler(request -> {
2
  // This handler will be called every time an HTTP request is received at the server
3
  request.response().end("hello world!");
4
});

如果事件被触发,Vertx 会自动的进行异步调用。

不要阻塞!

除了极少数的例外(即一些以 Sync 结尾的文件系统操作),Vertx 中的所有 API 都不会阻塞调用线程。

如果可以立即提供一个结果,它就会立即返回,否则你通常会提供一个 handler,在一段时间后接收事件。

因为 Vertx 的 API 都不阻塞线程,所以可以使用 Vertx 只用少量的线程来处理大量的并发性。

前面也说了,传统线程模型通过创建大量的线程来处理请求,而 Vertx 通过在 Event Loop 中调用你的处理程序来应对并发请求。这其实就是并发模型中的反应器模式(Reactor Pattern)——以高性能著称的 Node.js 也是这种模式。

在一个标准的反应器实现中,有一个单一的事件循环线程,它在一个循环中运行,当所有的事件到达时,将它们传递给所有的处理程序。

单个线程的问题是它在任何时候都只能在单个核心上运行,所以如果你想让你的单线程 Reactor 应用(例如你的 Node.js 应用)在多核心服务器上扩展,你必须启动和管理许多不同的进程。

Vertx 的工作方式略有不同。每个 Vertx 实例不是一个单一的事件循环,而是维护几个事件循环。默认情况下,我们根据机器上可用的核心数量来选择数量,也可以自行设置。

也就是说一个 Vertx 进程可以在服务器上扩展,他们把这种模式称为多反应器模式,以区别于单线程的反应器模式。

黄金法则——不要阻塞事件循环

我们已经知道 Vertx 的 API 是非阻塞的,不会阻塞 Event Loop,但如果你在处理程序中阻塞 Event Loop,那就没有什么帮助了。

如果你这样做了,那么这个 Event Loop 在被阻塞的时候将不能做任何其他事情,应用程序就会完全停滞不前,所以不要这样做。

阻塞的例子包括:

  • Thread.sleep()

  • 在锁上等待

  • 等待一个 mutex 或监视器(如同步部分)。

  • 做一个长期的数据库操作并等待结果

  • 做一个复杂的计算,需要一些重要的时间。

  • 在一个循环中旋转

具体等待多长的时间算是阻塞?

如果你有一个单一的事件循环,并且你想每秒处理 10000 个 http 请求,那么很明显,每个请求的处理时间不能超过 0.1 毫秒,所以你不能阻塞超过这个时间。

如果你的应用程序没有反应,这可能是你在某个地方阻塞了一个事件循环的迹象。为了帮助你诊断这类问题,如果Vert.x检测到一个事件循环在一段时间内没有返回,它会自动记录警告。如果你在日志中看到类似这样的警告,那么你应该进行调查。

具体用法

Vertx 的原理搞清楚之后,使用就非常简单了。比如启动一个 http 服务器,只需要这么一点代码:

1
vertx.createHttpServer().requestHandler(request -> {
2
  request.response().end("Hello world");
3
}).listen(8080);

还可以用他封装好的 http client 工具:

1
WebClient client = WebClient.create(vertx);
2
3
// Send a GET request
4
client
5
  .get(8080, "myserver.mycompany.com", "/some-uri")
6
  .send()
7
  .onSuccess(response -> System.out
8
    .println("Received response with status code" + response.statusCode()))
9
  .onFailure(err ->
10
    System.out.println("Something went wrong " + err.getMessage()));

更多用法参考官方文档吧,写的比较详细就不赘述。

干掉回调地狱

在使用 Vertx 的异步无阻塞 API 时,如果我们要保证一系列操作的执行顺序,通常不能像一般的框架那样简单的依次调用,而是依次把要调用的方法放在前一个方法的事件处理函数中,用回调函数用的比较多的同事一定遇到过这种情况:

1
vertx.fileSystem().writeFile(filePath, buffer, write -> {
2
    if (write.succeeded()) {
3
        vertx.createNetClient().connect(1234, "localhost", connect -> {
4
            if (connect.succeeded()) {
5
                connect.result().sendFile(filePath, send -> {
6
                    connect.result().close(); // 关闭不再使用的Socket
7
                    if (send.succeeded()) {
8
                        vertx.fileSystem().copy(filePath, backupPath, copy -> {
9
                            if (copy.succeeded()) {
10
                                vertx.fileSystem().delete(filePath, delete -> {
11
                                    if (delete.succeeded()) {
12
                                        logger.info("Hello, callback hell.");
13
                                    } else {
14
                                        logger.error(delete.cause().getMessage());
15
                                    }
16
                                });
17
                            } else {
18
                                logger.error(copy.cause().getMessage());
19
                            }
20
                        });
21
                    } else {
22
                        logger.error(send.cause().getMessage());
23
                    }
24
                });
25
            } else {
26
                logger.error(connect.cause().getMessage());
27
            }
28
        });
29
    } else {
30
        logger.error(write.cause().getMessage());
31
    }
32
});

这段代码先是把一段内容写到一个新文件里,然后建立一个 TCP 连接把文件发过去,再把这个文件拷贝到另一个目录作为备份,最后把原文件删掉。回调函数一层层的嵌套,形成了这样的代码结构,这就是回调地狱

嵌套个两三层,其实还可以接受,如果业务流程比较长,这样的代码就很难看了。Vertx 提供了四种方法解决这个问题:

  • Future
  • Vert.x Rx
  • Vert.x Async
  • Kotlin coroutine
1
public interface Future<T> extends AsyncResult<T>, Handler<AsyncResult<T>> {
2
    // 用结果 result 将这个未完成的 AsyncResult<T> 设为成功
3
    void complete(T result);
4
    // 用异常 cause 将这个未完成的 AsyncResult<T> 设为失败
5
    void fail(Throwable cause);
6
    // 设置该 Future 对象被完成时应该调用的处理函数
7
    Future<T> setHandler(Handler<AsyncResult<T>> handler);
8
    // 其他方法
9
}

所以我们用 Future 改写上面的代码的话:

1
Future<Void> futureWrite = Future.future();
2
Future<NetSocket> futureConnect = Future.future();
3
Future<Void> futureSend = Future.future();
4
Future<Void> futureCopy = Future.future();
5
Future<Void> futureDelete = Future.future();
6
7
vertx.fileSystem().writeFile(filePath, buffer, futureWrite);
8
9
futureWrite.setHandler(ar -> {
10
    if (ar.succeeded()) {
11
        vertx.createNetClient().connect(1234, "localhost", futureConnect);
12
    } else {
13
        logger.error(ar.cause().getMessage());
14
    }
15
});
16
17
futureConnect.setHandler(ar -> {
18
    if (ar.succeeded()) {
19
        ar.result().sendFile(filePath, futureSend);
20
    } else {
21
        logger.error(ar.cause().getMessage());
22
    }
23
});
24
// ......

看起来效果还不错,整齐多了,代码也不会随着业务流程长度而无限制缩进了。不过这样还是存在两个问题:

  • 颠倒两个代码块的顺序,该程序仍然是可以运行的,这样一来没有顺序上的约束,很容易产生混乱的代码。
  • 异常处理存在大量重复代码。

好在 Future 还提供了一个用于链式调用的方法 compose,我们使用 Future 的 compose 方法再次重构这部分代码:

1
Future<Void> futureWrite = Future.future();
2
Future<NetSocket> futureConnect = Future.future();
3
Future<Void> futureSend = Future.future();
4
Future<Void> futureCopy = Future.future();
5
Future<Void> futureDelete = Future.future();
6
7
vertx.fileSystem().writeFile(filePath, buffer, futureWrite);
8
9
futureWrite.compose(v -> {
10
    vertx.createNetClient().connect(1234, "localhost", futureConnect);
11
}, futureConnect).compose(socket -> {
12
    socket.sendFile(filePath, futureSend);
13
}, futureSend).compose(v -> {
14
    futureConnect.result().close(); // 关闭不再使用的Socket
15
    vertx.fileSystem().copy(filePath, backupPath, futureCopy);
16
}, futureCopy).compose(v -> {
17
    vertx.fileSystem().delete(filePath, futureDelete);
18
}, futureDelete).setHandler(ar -> {
19
    if (ar.succeeded()) {
20
        logger.info("Hello, future compose!!!");
21
    } else {
22
        if (futureConnect.succeeded()) {
23
            futureConnect.result().close(); // 关闭不再使用的Socket
24
        }
25
        logger.error(ar.cause().getMessage());
26
    }
27
});

除了最后一个回调函数 ,前面的所有回调函数的参数并不是一个 AsyncResult<T> 对象,而是我们期望的结果,即一个类型为 T 的对象;也就是说每次 compose 只处理上一步成功的情况,失败的异常会被层层传递到最后一个回调函数处理——这就有点像传统的 try catch 结构。

总结

Vertx 优点

  • 性能强悍,始终处于 JVM 语言第一梯队。
  • 轻量级,云原生时代很适合跟容器搭配。
  • 跨语言,像跟 Kotlin 的协程搭配,可以写的很优雅

Vertx 缺点

  • 有一定的开发、学习成本,想要适应全异步的编程,既需要生态支持,也需要编程思维的转变。
  • 定位类似于脚手架而不是大而全的框架,如果需要应对的是复杂、业务规模庞大、不断变动和扩张的应用,那么 Spring 还是不可替代

参考资料:

响应式编程Wiki定义

编程范式:函数式编程&防御式编程&响应式编程&契约式编程&流式编程

Vertx-介绍及快速入门

回调地狱与 Future 对象https://gengteng.gitbooks.io/my-vertx-guide/content/chapter3/section5.html)