Manjusaka

Manjusaka

Discussing the "Herd Effect" in Online Events

I actually started paying attention to the thundering herd problem last year. I then submitted a patch to CPython regarding the resolution of the selector thundering herd problem BPO-35517. Now, let's talk about the thundering herd problem a bit.

The Past of the Thundering Herd Problem#

What is the Thundering Herd Problem?#

The thundering herd problem, also known as the thundering herd effect, occurs when multiple processes or threads are waiting for the same event. When the event occurs, all threads and processes are awakened by the kernel. After waking up, usually only one process successfully acquires the event and processes it, while the other processes find that they failed to acquire the event and continue to wait, which reduces system performance to some extent.

Many people might wonder why the thundering herd effect consumes system resources and reduces system performance?

  1. The awakening of multiple processes/threads involves a context switching issue. Frequent context switching leads to data frequently circulating between registers and the run queue. In extreme cases, more time is consumed in scheduling processes/threads rather than executing them.

Next, let's discuss the common thundering herd problems we encounter in network programming.

Common Thundering Herd Problems#

In Linux, we commonly see the thundering herd effect when we use accept and the system-provided APIs like select, poll, or epoll to handle our network connections.

Thundering Herd with Accept#

First, let's review our traditional accept usage with a flowchart.

image

Here, there is a situation where, when a request arrives, all processes/threads start to accept, but ultimately only one succeeds. Let's write some code to see.

#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4

int worker_process(int listenfd, int i) {
  while (1) {
    printf("I am work %d, my pid is %d, begin to accept connections \n", i,
           getpid());
    struct sockaddr_in client_info;
    socklen_t client_info_len = sizeof(client_info);
    int connection =
        accept(listenfd, (struct sockaddr *)&client_info, &client_info_len);
    if (connection != -1) {
      printf("worker %d accept success\n", i);
      printf("ip :%s\t", inet_ntoa(client_info.sin_addr));
      printf("port: %d \n", client_info.sin_port);
    } else {
      printf("worker %d accept failed", i);
    }
    close(connection);
  }

  return 0;
}

int main() {
  int i = 0;
  struct sockaddr_in address;
  bzero(&address, sizeof(address));
  address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &address.sin_addr);
  address.sin_port = htons(SERVER_PORT);
  int listenfd = socket(PF_INET, SOCK_STREAM, 0);
  int ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
  ret = listen(listenfd, 5);
  for (i = 0; i < WORKER_COUNT; i++) {
    printf("Create worker %d\n", i + 1);
    pid_t pid = fork();
    /*child  process */
    if (pid == 0) {
      worker_process(listenfd, i);
    }
    if (pid < 0) {
      printf("fork error");
    }
  }

  /*wait child process*/
  int status;
  wait(&status);
  return 0;
}

Let's take a look at the running results.

image

Huh? What's going on? Why didn't we see the phenomenon we wanted (one process succeeds in accept, and three processes fail)? The reason is that since Linux 2.6, the thundering herd problem with accept has been handled at the kernel level.

Alright, let's move on.

Thundering Herd with Select/Poll/Epoll#

Taking epoll as an example, let's look at the traditional working mode.

image

Now, let's look at some code.

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10087
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl error");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl set error");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("Worker %d pid is %d get value from epoll_wait\n", k, getpid());
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epoll err\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("worker %d accept failed\n", k);
          break;
        }
        printf("worker %d accept success\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  event.events = EPOLLIN;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("create worker %d\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

Then, we send a TCP request using telnet to see the effect, and we get the following result.

image

We can see that when a request arrives, all four processes are awakened. Now, to visualize this process more intuitively, let's profile it using strace.

image

We can still see that all four processes are awakened, but only Worker 3 successfully accepts, while the other processes received the EAGAIN error when trying to accept.

The Linux documentation describes EAGAIN as follows:

The socket is marked nonblocking and no connections are present to be accepted. POSIX.1-2001 and POSIX.1-2008 allow either error to be returned for this case, and do not require these constants to have the same value, so a portable application should check for both possibilities.

Now, do we have an intuitive understanding of the thundering herd problem with EPOLL? So how do we solve the thundering herd problem?

The Present of the Thundering Herd Problem#

Solving the Thundering Herd Problem from the Kernel#

As mentioned earlier, the thundering herd problem with accept has been resolved at the kernel level since Linux Kernel 2.6. But what about EPOLL? In January 2016, Linus, the father of Linux, submitted a patch to the kernel.

See epoll: add EPOLLEXCLUSIVE flag

The key code is:

		if (epi->event.events & EPOLLEXCLUSIVE)
			add_wait_queue_exclusive(whead, &pwq->wait);
		else
			add_wait_queue(whead, &pwq->wait);

In short, by adding an EPOLLEXCLUSIVE flag as an auxiliary. If the user enables EPOLLEXCLUSIVE, then when joining the kernel wait queue, add_wait_queue_exclusive is used; otherwise, add_wait_queue is used.

For the usage of these two functions, you can refer to this article Handling wait queues.

There is a description that states:

The add_wait_queue( ) function inserts a nonexclusive process in the first position of a wait queue list. The add_wait_queue_exclusive( ) function inserts an exclusive process in the last position of a wait queue list.

Now, let's modify our code (the kernel version must be Linux Kernel 4.5 or later).

#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

#define SERVER_ADDRESS "0.0.0.0"
#define SERVER_PORT 10086
#define WORKER_COUNT 4
#define MAXEVENTS 64

static int create_and_bind_socket() {
  int fd = socket(PF_INET, SOCK_STREAM, 0);
  struct sockaddr_in server_address;
  server_address.sin_family = AF_INET;
  inet_pton(AF_INET, SERVER_ADDRESS, &server_address.sin_addr);
  server_address.sin_port = htons(SERVER_PORT);
  bind(fd, (struct sockaddr *)&server_address, sizeof(server_address));
  return fd;
}

static int make_non_blocking_socket(int sfd) {
  int flags, s;
  flags = fcntl(sfd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl error");
    return -1;
  }
  flags |= O_NONBLOCK;
  s = fcntl(sfd, F_SETFL, flags);
  if (s == -1) {
    perror("fcntl set error");
    return -1;
  }
  return 0;
}

int worker_process(int listenfd, int epoll_fd, struct epoll_event *events,
                   int k) {
  while (1) {
    int n;
    n = epoll_wait(epoll_fd, events, MAXEVENTS, -1);
    printf("Worker %d pid is %d get value from epoll_wait\n", k, getpid());
    sleep(0.2);
    for (int i = 0; i < n; i++) {
      if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) ||
          (!(events[i].events & EPOLLIN))) {
        printf("%d\n", i);
        fprintf(stderr, "epoll err\n");
        close(events[i].data.fd);
        continue;
      } else if (listenfd == events[i].data.fd) {
        struct sockaddr in_addr;
        socklen_t in_len;
        int in_fd;
        in_len = sizeof(in_addr);
        in_fd = accept(listenfd, &in_addr, &in_len);
        if (in_fd == -1) {
          printf("worker %d accept failed\n", k);
          break;
        }
        printf("worker %d accept success\n", k);
        close(in_fd);
      }
    }
  }

  return 0;
}

int main() {
  int listen_fd, s;
  int epoll_fd;
  struct epoll_event event;
  struct epoll_event *events;
  listen_fd = create_and_bind_socket();
  if (listen_fd == -1) {
    abort();
  }
  s = make_non_blocking_socket(listen_fd);
  if (s == -1) {
    abort();
  }
  s = listen(listen_fd, SOMAXCONN);
  if (s == -1) {
    abort();
  }
  epoll_fd = epoll_create(MAXEVENTS);
  if (epoll_fd == -1) {
    abort();
  }
  event.data.fd = listen_fd;
  // add EPOLLEXCLUSIVE support
  event.events = EPOLLIN | EPOLLEXCLUSIVE;
  s = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
  if (s == -1) {
    abort();
  }
  events = calloc(MAXEVENTS, sizeof(event));
  for (int i = 0; i < WORKER_COUNT; i++) {
    printf("create worker %d\n", i);
    int pid = fork();
    if (pid == 0) {
      worker_process(listen_fd, epoll_fd, events, i);
    }
  }
  int status;
  wait(&status);
  free(events);
  close(listen_fd);
  return EXIT_SUCCESS;
}

Now, let's see the effect.

image

Huh? Why are there still two processes awakened? The reason is that EPOLLEXCLUSIVE only guarantees that the number of awakened processes is less than or equal to the number of processes we have started, and does not directly wake up all processes, nor does it guarantee that only one process is awakened.

Let's look at the official description:

Sets an exclusive wakeup mode for the epoll file descriptor that is being attached to the target file descriptor, fd. When a wakeup event occurs and multiple epoll file descriptors are attached to the same target file using EPOLLEXCLUSIVE, one or more of the epoll file descriptors will receive an event with epoll_wait(2). The default in this scenario (when EPOLLEXCLUSIVE is not set) is for all epoll file descriptors to receive an event. EPOLLEXCLUSIVE is thus useful for avoiding thundering herd problems in certain scenarios.

In other words, as it stands, the system cannot strictly guarantee the resolution of the thundering herd problem. Many times we still need to rely on the design of the application layer itself to solve it.

Application Layer Solutions#

Currently, there are two strategies for applications to solve the thundering herd problem:

  1. This is an acceptable cost, so we temporarily ignore it. This is our strategy most of the time.

  2. Solve this problem through locking or other means, with the most typical example being Nginx.

Let's see how Nginx solves this problem.

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    if (ngx_timer_resolution) {
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;
    }

    if (ngx_use_accept_mutex) {
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

            if (ngx_accept_mutex_held) {
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;

    (void) ngx_process_events(cycle, timer, flags);

    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    if (ngx_accept_mutex_held) {
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }

    if (delta) {
        ngx_event_expire_timers();
    }

    ngx_event_process_posted(cycle, &ngx_posted_events);
}

Here we can see that Nginx's main idea is to handle this problem through locking. Before each process listens for FD events, it first tries to acquire a global lock using ngx_trylock_accept_mutex. If it successfully acquires the lock, it then attempts to process events through ngx_process_events. If it fails to acquire the lock, it abandons the operation for that time. So, in a sense, for a particular FD, Nginx only allows one worker to handle events on that FD at the same time, thus avoiding the thundering herd problem.

Conclusion#

This article has been delayed for a long time since last year. The thundering herd problem is something we encounter in our daily work. I personally feel that it is necessary to write a detailed note to record some of the learning experiences from last year to now. That's about it, and I hope you find it helpful.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.