跳转至

异步编程:阻塞 I/O 和非阻塞 I/O

Asynchronous programming. Blocking I/O and non-blocking I/O

asynchronous-programming

这是关于异步编程的系列文章中的第一篇。整个系列试图回答一个简单的问题:“什么是异步?”。起初我开始研究这个问题时,我以为我知道它是什么。后来才发现我对它一无所知。那么让我们来了解一下吧!

This is the first post in a series on asynchronous programming. The whole series tries to answer a simple question: "What is asynchrony?". In the beginning, when I first started digging into the question, I thought I knew what it is. It turned out that I didn't know the slightest thing about asynchrony. So let's find out!

整个系列:

Whole series:

本文将以网络为例,但你可以轻松地将它类比到其他的输入/输出 (I/O) 操作,例如将套接字更改为文件描述符。尽管示例使用 Python 编写(人生苦短,我用 Python!),但其中表达的思想并不依赖任何特定的编程语言。

In this post, we will be talking about networking but you can easily map it to other input/output(I/O) operations, for example, change sockets to file descriptors. Also, this explanation is not focusing on any specific programming language although the examples will be given in Python(what can I say – I love Python!).

无论如何,当你对阻塞或非阻塞调用有疑问时,通常是在处理 I/O。在我们这个信息、微服务和 lambda 函数的时代,最常见的情形是处理请求。我们可以想象,亲爱的读者,此时你是网站的用户,而浏览器(或正在阅读这些文字所使用的应用程序)是客户端。在 Amazon 云深处的某个地方,某台服务器在处理你传入的请求,以生成这些正在被阅读的文字。

One way or another, when you have a question about blocking or non-blocking calls, most commonly it means dealing with I/O. The most frequent case in our age of information, microservices, and lambda functions will be request processing. We can immediately imagine that you, dear reader, are a user of a web site, while your browser (or the application where you're reading these lines) is a client. Somewhere in the depths of the Amazon, there is a server that handles your incoming requests to generate the same lines that you're reading.

为了在这种客户端—服务端通信中进行交互,客户端和服务端必须先相互建立连接。因为七层模型和这个交互所涉及的协议栈在网上都很容易找到,所以我们不会深入讨论它们。需要了解的是,在双方(客户端和服务端)上都有被称为套接字的特殊连接点。客户端和服务端都必须绑定到彼此的套接字上,监听它们以明白对方在电线另一侧所说的内容。

In order to start an interaction in such client-server communications, the client and the server must first establish a connection with each other. We will not go into the depths of the 7-layer model and the protocol stack that is involved in this interaction, as I think it all can be easily found on the Internet. What we need to understand is that on both sides (client and server) there are special connection points known as sockets. Both the client and server must be bound to each other's sockets, and listen to them to understand what the other says on the opposite side of the wire.

asynchronous-programming-blocking-and-non-blocking-1

在我们的通信中,服务端在做一些事情——处理请求,或是将 Markdown 转换为 HTML,或是查找图片的位置。

In our communication, the server doing something — either processes the request, converts markdown to HTML or looks where the images are, it performs some kind of processing.

latency-of-events

如果查看 CPU 速度和网络速度之间的对比,差异会是几个数量级。如果应用程序大部分时间都在使用 I/O,处理器在大多数时间空闲,这类程序被称为 I/O 密集型。对于需要高性能的应用来说,它会是一个瓶颈,这也是我们接下来要讲的。

If you look at the ratio between CPU speed and network speed, the difference is a couple of orders of magnitude. It turns out that if our application uses I/O most of the time, in most cases the processor simply does nothing. This type of application is called I/O-bound. For applications that require high performance, it is a bottleneck, and that is what we will talk about next.

有两种组织 I/O 的方式(我将基于 Linux 给出示例):阻塞非阻塞

There are two ways to organize I/O (I will give examples based on Linux): blocking and non-blocking.

此外,还有两种类型的 I/O 操作:同步和异步。

Also, there are two types of I/O operations: synchronous and asynchronous.

它们的组合代表了可能的 I/O 模型。

All together they represent possible I/O models.

asynchronous-programming-blocking-and-non-blocking-3

每一个 I/O 模型都对特定的应用程序更有利。本文中演示两种组织 I/O 方式之间的区别。

Each of these I/O models has usage patterns that are advantageous for particular applications. Here I will demonstrate the difference between the two ways of organizing I/O.

阻塞 IO

Blocking I/O

使用阻塞 I/O,当客户端向服务端发出连接请求时,处理该连接的套接字和从中读取的相应线程被阻塞,直到读取到一些数据。这些数据被放置在网络缓冲区中,直到它被全部读取并准备好进行处理。但在这些操作完成之前,服务端只能等待

With the blocking I/O, when the client makes a connection request to the server, the socket processing that connection and the corresponding thread that reads from it is blocked until some read data appears. This data is placed in the network buffer until it is all read and ready for processing. Until the operation is complete, the server can do nothing more but wait.

由此得出的最简单的结论是,我们无法在单线程中同时处理多个连接。TCP 套接字默认以阻塞模式进行工作。

The simplest conclusion from this is that we cannot serve more than one connection within a single thread. By default, TCP sockets work in blocking mode.

一个 Python 的简单客户端示例:

A simple example on Python, client:

import socket
import sys
import time


def main() -> None:
    host = socket.gethostname()
    port = 12345

    # create a TCP/IP socket
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        while True:
            sock.connect((host, port))
            while True:
                data = str.encode(sys.argv[1])
                sock.send(data)
                time.sleep(0.5)

if __name__ == "__main__":
    assert len(sys.argv) > 1, "Please provide message"
    main()

上例中我们在无限循环中以 50ms 的间隔向服务器发送一条消息。想象一下,如果下载一个大文件使用这种客户端—服务端通信——将耗费大量的时间。

Here we send a message with 50ms interval to the server in the endless loop. Imagine that this client-server communication consist of downloading a big file — it takes some time to finish.

服务端代码:

And the server:

import socket


def main() -> None:
    host = socket.gethostname()
    port = 12345

    # create a TCP/IP socket
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        # bind the socket to the port
        sock.bind((host, port))
        # listen for incoming connections
        sock.listen(5)
        print("Server started...")

        while True:
            conn, addr = sock.accept()  # accepting the incoming connection, blocking
            print('Connected by ' + str(addr))
            while True:
                data = conn.recv(1024)  # receving data, blocking
                if not data:
                    break
                print(data)

if __name__ == "__main__":
    main()

分别在多个终端窗口中独立地运行客户端:

I am running this in separate terminal windows with several clients as:

$ python client.py "client N"

在此之前先启动服务端:

And server as:

$ python server.py

此服务端中我们只是监听套接字并接受传入的连接。然后尝试从这个连接中接收数据。

Here we just listen to the socket and accept incoming connections. Then we try to receive data from this connection.

上例中服务端基本上会被单个客户端的连接阻塞!如果我们运行另一个客户端并发送别的消息,你将看不到它。我强烈建议运行示例来理解正在发生的事情。

In the above code, the server will essentially be blocked by a single client connection! If we run another client with another message, you will not see it. I highly recommend that you play with this example to understand what is happening.

那具体的过程是怎样的?

What is going on here?

send() 方法尝试将所有数据发送到服务端,接下来服务端上的写入缓冲区将接收数据。当系统调用读取缓冲区时,应用程序被阻塞,上下文切换到内核。内核开始读取——然后数据被传输到用户空间的缓冲区。当缓冲区变空时,内核将再次唤醒应用程序进程以接收下一部分被传输的数据。

The send() method will try to send all data to the server while the write buffer on the server will continue to receive data. When the system call for reading is called, the application is blocked and the context is switched to the kernel. The kernel initiates reading - the data is transferred to the user-space buffer. When the buffer becomes empty, the kernel will wake up the process again to receive the next portion of data to be transferred.

现在为了用这种形式处理两个客户端,我们就需要有多个线程,即为每个客户端连接分配一个新线程。我们下面会尝试这一点。

Now in order to handle two clients with this approach, we need to have several threads, i.e. to allocate a new thread for each client connection. We will get back to that soon.

非阻塞 IO

Non-blocking I/O

还有第二个选择——非阻塞 I/O。从名字就很明显能看出区别了——立即执行任何操作,而不会阻塞。非阻塞 I/O 意味着请求会排队,并且函数会立刻返回。在稍后的某个时间点处理实际的 I/O。

However, there is also a second option — non-blocking I/O. The difference is obvious from its name — instead of blocking, any operation is executed immediately. Non-blocking I/O means that the request is immediately queued and the function is returned. The actual I/O is then processed at some later point.

你可以通过将套接字设置为非阻塞模式来实际使用它。如果非阻塞套接字中没有数据,尝试读取它将返回错误代码(EAGAINEWOULDBLOCK)。

By setting a socket to a non-blocking mode, you can effectively interrogate it. If you try to read from a non-blocking socket and there is no data, it will return an error code (EAGAIN or EWOULDBLOCK).

实际上,这种轮询数据的形式并非一个好主意。如果你的程序以恒定周期从套接字轮询数据,它将浪费昂贵的 CPU 时间。这可能非常低效,因为在许多情况下,应用在等待数据可读时会处于忙等待的状态,或者在内核中执行命令时尝试执行其他工作。检查数据是否可读的更优雅的方法之一是使用 select()

Actually, this polling type is a bad idea. If you run your program in a constant cycle of polling data from the socket, it will consume expensive CPU time. This can be extremely inefficient because in many cases the application must busy-wait until the data is available or attempt to do other work while the command is performed in the kernel. A more elegant way to check if the data is readable is using select().

修改服务端示例:

Let us go back to our example with the changes on the server:

import select
import socket


def main() -> None:
    host = socket.gethostname()
    port = 12345

    # create a TCP/IP socket
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.setblocking(0)
        # bind the socket to the port
        sock.bind((host, port))
        # listen for incoming connections
        sock.listen(5)
        print("Server started...")

        # sockets from which we expect to read
        inputs = [sock]
        outputs = []

        while inputs:
            # wait for at least one of the sockets to be ready for processing
            readable, writable, exceptional = select.select(inputs, outputs, inputs)

            for s in readable:
                if s is sock:
                    conn, addr = s.accept()
                    inputs.append(conn)
                else:
                    data = s.recv(1024)
                    if data:
                        print(data)
                    else:
                        inputs.remove(s)
                        s.close()

if __name__ == "__main__":
    main()

如果现在我们运行多个客户端,将看到服务端并未被单个客户端所阻塞,而且它处理的所有消息都会被打印出来。同样的,我建议尝试运行这个示例。

Now if we run this code with >1 clients you will see that the server is not blocked by a single client and it handles everything that can be detected by the messages displayed. Again, I suggest that you try this example yourself.

这个过程又是怎样的?

What's going on here?

服务端不会等待所有数据都写入缓冲区。当我们通过调用 setblocking(0) 使套接字变为非阻塞时,它永远不会等待操作完成。所以当我们调用 recv 方法时,它会返回到主线程。这其中主要的区别是 sendrecvconnectaccept 可以在不做任何事情的情况下立刻返回。

Here the server does not wait for all the data to be written to the buffer. When we make a socket non-blocking by calling setblocking(0), it will never wait for the operation to be completed. So when we call the recv method, it will return to the main thread. The main mechanical difference is that send, recv, connect and accept can return without doing anything at all.

通过使用这些函数,我们可以在一个线程中通过多个套接字并发地执行多个 I/O 操作。但由于我们不知道套接字是否准备好进行 I/O 操作,我们将不得不向每个套接字询问相同的问题,这本质上是在无限循环中转圈(这种非阻塞但仍然同步的方法称为 I /O 多路复用)。

With this approach, we can perform multiple I/O operations with different sockets from the same thread concurrently. But since we don't know if a socket is ready for an I/O operation, we would have to ask each socket with the same question and essentially spin in an infinite loop (this non-blocking but the still synchronous approach is called I/O multiplexing).

为了摆脱这个低效的循环,我们需要轮询准备机制。在这种机制中,我们可以直接询问所有套接字的就绪状态,它们会告诉我们哪个套接字已准备好进行新的 I/O 操作,而哪个套接字还未准备好。当任何一个套接字准备好时,我们将在队列中进行操作,然后等待套接字准备好后再进行下一次 I/O 操作。

To get rid of this inefficient loop, we need polling readiness mechanism. In this mechanism, we could interrogate the readiness of all sockets, and they would tell us which one is ready for the new I/O operation and which one is not without being explicitly asked. When any of the sockets is ready, we will perform operations in the queue and then be able to return to the blocking state, waiting for the sockets to be ready for the next I/O operation.

有几种轮询准备机制,它们在性能和细节上有所不同,但通常细节是隐藏在“幕后”的,我们并不可见。

There are several polling readiness mechanisms, they are different in performance and detail, but usually, the details are hidden "under the hood" and not visible to us.

关键字

Keywords to search:

通知:

  • 水平触发 (状态)

  • 边缘触发 (状态的变化)

机制:

  • select(), poll()
  • epoll(), kqueue()

  • EAGAIN, EWOULDBLOCK

Notifications:

  • Level Triggering (state)

  • Edge Triggering (state changed)

Mechanics:

  • select(), poll()

  • epoll(), kqueue()

  • EAGAIN, EWOULDBLOCK

多任务处理

Multitasking

如果我们的目标是一次管理多个客户端。我们如何确保能够同时处理多个请求?

Therefore, our goal is to manage multiple clients at once. How can we ensure multiple requests are processed at the same time?

有几种选择:

There are several options:

独立的进程

Separate processes

asynchronous-programming-blocking-and-non-blocking-2

在单独的进程中处理每个请求,是最简单也是历史上的第一种方法。这种方法是令人满意的,因为我们可以使用和原来相同的阻塞 I/O API。如果一个进程突然失败,它只会影响在该进程中的操作,而不影响其他进程。

The simplest and historically first approach is to handle each request in a separate process. This approach is satisfactory because we can use the same blocking I/O API. If a process suddenly fails, it will only affect the operations that are processed in that particular process and not any others.

这个方法的缺点是跨进程沟通十分复杂。进程之间在形式上几乎没有共同点,想要组织的进程之间的任何通信都需要额外的努力来进行同步访问等。另外有些时候某几个进程只是在等待客户端请求,而这会造成资源的浪费。

The minus is complex communication. Formally there is almost nothing in common between the processes, and any non-trivial communication between the processes that we want to organize requires additional efforts to synchronize access, etc. Also at any moment, there can be several processes that just wait for client requests, and this is just a waste of resources.

再让我们分析下这个过程。一旦第一个进程(主进程)启动,它就会生成一些工作进程。每个工作进程都可以在同一个套接字上接收请求并等待传入的客户端。一旦传入连接,就分出一个进程处理它——接收此连接,处理业务,关闭套接字,然后为下一个请求做好准备。这个流程也可能有所变化——可以在有连接传入时再生成进程,或者提前启动进程等。这可能会关系到性能,但现在对我们来说并没那么重要。

Let us see how this works in practice. As soon as the first process (the main process/master process) starts, it generates some set of processes as workers. Each of them can receive requests on the same socket and wait for incoming clients. As soon as an incoming connection appears, one of the processes handling it — receives this connection, processes it from beginning to end, closes the socket and then becomes ready again for the next request. Variations are possible — the process can be generated for each incoming connection, or they can all be started in advance, etc. This may affect performance, but it is not so important for us now.

此类系统的案例:

  • Apache mod_prefork
  • 为 PHP 用户提供的 FastCGI;
  • 为 Ruby on Rails 用户提供的 Phusion Passenger;
  • PostgreSQL;

Examples of such systems:

  • Apache mod_prefork;
  • FastCGI for those who most often run PHP;
  • Phusion Passenger for those who write on Ruby on Rails;
  • PostgreSQL.

线程

Threads

另一个方法是使用 操作系统 线程。在一个进程中,我们可以创建多个线程。也可以使用阻塞 I/O,因为只会阻塞单线程。

Another approach is to use Operating System(OS) threads. Within one process we can create several threads. I/O blocking can also be used because only one thread will be blocked.

例如:

Example:

import select
import threading
import socket


def handler(client):
    while True:
        data = client.recv(1024)
        if data:
            print(data)

    client.close()

def main() -> None:
    host = socket.gethostname()
    port = 12345

    # create a TCP/IP socket
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        # bind the socket to the port
        sock.bind((host, port))
        # listen for incoming connections
        sock.listen(5)
        print("Server started...")

        while True:
            client, addr = sock.accept()
            threading.Thread(target=handler, args=(client,)).start()

if __name__ == "__main__":
    main()

使用 linux 下 ps 命令检查服务端进程的线程数:

To check the number of threads on the server process you can use linux ps command with server process PID:

$ ps huH p <PID> | wc -l

操作系统会管理线程,并选择可用的 CPU 核心分配给它们。线程比进程轻。这意味着我们可以在同一系统上可以生成比进程更多的线程。我们很难运行 10,000 个进程,但 10,000 个线程可以很容易。但这并不是说线程会很有效率。

The operating system manages the threads itself and is capable of distributing them between available CPU cores. Threads are lighter than processes. In essence, it means we can generate more threads than processes on the same system. We can hardly run 10,000 processes, but 10,000 threads can be easy. Not that it'll be efficient.

另一方面,线程之间没有隔离,即如果发生任何崩溃,它可能不仅导致特定线程崩溃,还可能会导致整个进程崩溃。另外最大的困难是,分配给进程的内存是多线程共享的。意味着线程需要同步访问内存。虽然同步访问最简单的情况是仅访问共享内存,但如果访问的是一个数据库连接,或者一个数据库连接池,这对于应用程序内部处理传入连接的所有线程来说都是常见的。难以同步对第三方资源的访问。

On the other hand, there is no isolation, i.e. if there is any crash, it may cause not only one particular thread to crash but the whole process to crash. And the biggest difficulty is that memory of the process where threads work is shared by threads. We have a shared resource — memory, and it means that there is a need to synchronize access to it. While the problem of synchronizing access to shared memory is the simplest case, but for example, there can be a connection to the database, or a pool of connections to the database, which is common for all the threads inside the application that handles incoming connections. It is difficult to synchronize access to the 3rd party resources.

常见的同步问题有:

  1. 在同步过程中可能死锁。当一个进程或线程进入等待状态时可能发生死锁,因为请求的系统资源被另一个处于等待进程持有,而另一个等待进程又在等待另一个等待进程持有的另一个资源。例如,以下情况会导致两个进程之间出现死锁: 进程 1 向进程 2 请求资源 B,资源 B 在进程 2 运行时被锁定,进程 2 需要进程 1 的资源 A 才能完成运行,而进程 1 正在运行所以资源 A 被锁定。

  2. 在竞争访问共享数据时缺乏同步。粗略地说,两个线程同时改变数据就会破坏数据。此类应用程序更难调试,而且并非所有错误都会立即出现。例如,Python 中著名的 GIL ——全局解释器锁,它是制作多线程应用程序的最简单方法之一。通过使用 GIL,所有的数据结构,所有的内存都由一个信号量保护。

There are common synchronization problems:

  1. During the synchronization process deadlocks are possible. A deadlock occurs when a process or thread enters a waiting state because the requested system resource is held by another waiting process which in turn is waiting for another resource held by another waiting process. For example, the following situation will cause a deadlock between two processes: Process 1 requests resource B from process 2. Resource B is locked while process 2 is running. Process 2 requires resource A from process 1 to finish running. Resource A is locked while process 1 is running.
  2. Lack of synchronization when we have competitive access to shared data. Roughly speaking, two threads change the data and spoil it at the same time. Such applications are more difficult to debug and not all the errors appear at once. For instance, the well-known GIL in Python — Global Interpreter Lock is one of the simplest ways to make a multithreaded application. By using GIL we say that all the data structures, all our memory are protected by just one semaphore for the entire process. In the next chapter, we will be talking about cooperative multitasking and its implementations.

下一篇文章将讨论协作式多任务处理及其实现。

In the next post, we will be talking about cooperative multitasking and its implementations.

原文链接 https://www.internalpointers.com/post/introduction-thread-synchronization
翻译日期 2022-07-29
本文链接
许 可 license CC BY-NC-SA