news 2026/4/2 19:31:38

Linux网络编程之负载均衡策略

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Linux网络编程之负载均衡策略

第一部分:什么情况下需要多个监听fd?

1.1 单监听fd的局限性

// 单监听fd的典型架构 socket_t server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_bind(server_fd, "0.0.0.0", 8080); sock_listen(server_fd, 128); ​ // 所有客户端都连接到同一个fd // 问题: // 1. 单一瓶颈点 // 2. 无法充分利用多核CPU // 3. 单个队列可能成为性能瓶颈 // 4. 无法实现连接隔离

1.2 需要多个监听fd的场景

场景原因解决方案
CPU多核优化单个进程无法利用多核多监听fd + 多进程/线程
负载均衡单个队列压力大多队列负载均衡
协议分离不同协议需要不同处理不同端口监听
连接隔离业务分区需求按用户/区域分区
容错冗余单点故障风险多监听fd热备
IPv4/IPv6双栈同时支持两种协议分别监听

第二部分:多监听fd的架构模式

2.1 SO_REUSEPORT模式(推荐)

/** * @file multi_listen_reuseport.c * @brief 使用SO_REUSEPORT的多监听fd * * Linux 3.9+ 支持,允许多个socket绑定到相同端口 * 内核自动进行负载均衡 */ ​ #include "socket.h" #include "reactor.h" #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/wait.h> ​ #define WORKER_COUNT 4 // 工作进程数 ​ /** * @brief 工作进程函数 */ static void worker_process(int worker_id, uint16_t port) { printf("Worker %d started (PID: %d)\n", worker_id, getpid()); // 每个worker创建自己的Reactor reactor_t reactor = reactor_create(1024, REACTOR_TRIGGER_EDGE); // 创建监听socket(所有worker绑定相同端口) socket_t server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); // 关键:设置SO_REUSEPORT sock_set_reuseport(server_fd, true); sock_set_reuseaddr(server_fd, true); // 绑定到相同端口 sock_addr_t addr; sock_addr_ipv4("0.0.0.0", port, &addr); sock_bind(server_fd, &addr); // 开始监听 sock_listen(server_fd, 128); sock_set_nonblock(server_fd, true); // 添加到Reactor reactor_add(reactor, server_fd, REACTOR_EVENT_READ, on_accept, (void*)(long)worker_id); printf("Worker %d listening on port %d\n", worker_id, port); // 运行事件循环 while (1) { reactor_run(reactor, 1000); } } ​ /** * @brief 主进程 */ int main(int argc, char* argv[]) { uint16_t port = 8080; printf("Master process (PID: %d) starting %d workers...\n", getpid(), WORKER_COUNT); // 创建多个工作进程 for (int i = 0; i < WORKER_COUNT; i++) { pid_t pid = fork(); if (pid == 0) { // 子进程(工作进程) worker_process(i, port); exit(0); } else if (pid < 0) { perror("fork"); exit(1); } printf("Started worker %d with PID: %d\n", i, pid); } // 主进程等待所有子进程 for (int i = 0; i < WORKER_COUNT; i++) { wait(NULL); } return 0; }

SO_REUSEPORT的优势

  • 内核级负载均衡

  • 连接均匀分配到不同进程

  • 减少锁竞争

  • 更好的CPU缓存局部性

2.2 主从Reactor模式

/** * @file master_slave_reactor.c * @brief 主从Reactor模式 * * 主Reactor只处理accept * 从Reactor处理IO */ ​ #include "socket.h" #include "reactor.h" #include <pthread.h> ​ #define SLAVE_COUNT 4 // 从Reactor数量 ​ /* 从Reactor上下文 */ typedef struct { reactor_t reactor; pthread_t thread; int id; int event_count; } slave_reactor_t; ​ /* 全局上下文 */ typedef struct { socket_t server_fd; reactor_t master_reactor; slave_reactor_t slaves[SLAVE_COUNT]; int next_slave; // 下一个分配的从Reactor common_mutex_t lock; } server_context_t; ​ /** * @brief 从Reactor线程函数 */ static void* slave_thread_func(void* arg) { slave_reactor_t* slave = (slave_reactor_t*)arg; printf("Slave reactor %d started\n", slave->id); // 创建从Reactor slave->reactor = reactor_create(1024, REACTOR_TRIGGER_EDGE); // 运行事件循环 while (1) { int processed = reactor_run(slave->reactor, 1000); slave->event_count += processed; // 每10秒打印统计 static time_t last_print = 0; time_t now = time(NULL); if (now - last_print >= 10) { printf("Slave %d: processed %d events total\n", slave->id, slave->event_count); last_print = now; } } return NULL; } ​ /** * @brief 接受连接并分配到从Reactor */ static void on_master_accept(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { server_context_t* ctx = (server_context_t*)user_data; if (events & REACTOR_EVENT_READ) { // 接受新连接 socket_t client_fd = sock_accept(ctx->server_fd, NULL, 0); if (client_fd == SOCKET_INVALID) { return; } sock_set_nonblock(client_fd, true); // 选择从Reactor(轮询) common_mutex_lock(&ctx->lock); int slave_id = ctx->next_slave; ctx->next_slave = (ctx->next_slave + 1) % SLAVE_COUNT; common_mutex_unlock(&ctx->lock); slave_reactor_t* slave = &ctx->slaves[slave_id]; // 创建连接上下文 client_context_t* client_ctx = common_calloc(1, sizeof(client_context_t)); client_ctx->sock = client_fd; client_ctx->slave_id = slave_id; // 添加到从Reactor(需要线程间通信) // 实际实现需要使用管道或eventfd通知从Reactor add_client_to_slave(slave, client_fd, client_ctx); printf("Accepted client fd=%d, assigned to slave %d\n", client_fd, slave_id); } } ​ /** * @brief 初始化服务器 */ int init_master_slave_server(uint16_t port) { server_context_t* ctx = common_calloc(1, sizeof(server_context_t)); // 创建服务器socket ctx->server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(ctx->server_fd, true); sock_addr_t addr; sock_addr_ipv4("0.0.0.0", port, &addr); sock_bind(ctx->server_fd, &addr); sock_listen(ctx->server_fd, 128); sock_set_nonblock(ctx->server_fd, true); // 创建主Reactor ctx->master_reactor = reactor_create(1024, REACTOR_TRIGGER_LEVEL); // 添加到主Reactor reactor_add(ctx->master_reactor, ctx->server_fd, REACTOR_EVENT_READ, on_master_accept, ctx); // 初始化锁 ctx->lock = common_mutex_create(); ctx->next_slave = 0; // 启动从Reactor线程 for (int i = 0; i < SLAVE_COUNT; i++) { ctx->slaves[i].id = i; ctx->slaves[i].event_count = 0; if (pthread_create(&ctx->slaves[i].thread, NULL, slave_thread_func, &ctx->slaves[i]) != 0) { perror("pthread_create"); return -1; } } printf("Master-slave server started on port %d\n", port); printf("Master reactor handling accepts, %d slaves handling IO\n", SLAVE_COUNT); // 运行主Reactor while (1) { reactor_run(ctx->master_reactor, 1000); } return 0; }

2.3 端口分区模式

/** * @file port_partition.c * @brief 端口分区模式 * * 不同业务使用不同端口,不同Reactor处理 * 例如:HTTP:8080, WebSocket:8081, API:8082 */ ​ #include "socket.h" #include "reactor.h" #include <pthread.h> ​ /* 端口配置 */ typedef struct { uint16_t port; const char* name; reactor_callback_t handler; } port_config_t; ​ /* 端口分区服务器 */ typedef struct { reactor_t reactor; socket_t server_fd; port_config_t config; pthread_t thread; } port_partition_t; ​ /** * @brief HTTP处理器 */ static void http_handler(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { if (events & REACTOR_EVENT_READ) { socket_t client_fd = sock_accept(fd, NULL, 0); if (client_fd != SOCKET_INVALID) { printf("HTTP connection accepted on fd=%d\n", client_fd); // 处理HTTP请求 handle_http_request(client_fd); sock_close(client_fd); } } } ​ /** * @brief WebSocket处理器 */ static void websocket_handler(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { if (events & REACTOR_EVENT_READ) { socket_t client_fd = sock_accept(fd, NULL, 0); if (client_fd != SOCKET_INVALID) { printf("WebSocket connection accepted on fd=%d\n", client_fd); // WebSocket握手 if (websocket_handshake(client_fd)) { // 添加到Reactor进行长连接管理 reactor_add(reactor, client_fd, REACTOR_EVENT_READ, on_websocket_data, NULL); } else { sock_close(client_fd); } } } } ​ /** * @brief 端口分区线程函数 */ static void* port_partition_thread(void* arg) { port_partition_t* partition = (port_partition_t*)arg; printf("Starting %s server on port %d\n", partition->config.name, partition->config.port); // 创建Reactor partition->reactor = reactor_create(1024, REACTOR_TRIGGER_LEVEL); // 创建服务器socket partition->server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(partition->server_fd, true); sock_addr_t addr; sock_addr_ipv4("0.0.0.0", partition->config.port, &addr); sock_bind(partition->server_fd, &addr); sock_listen(partition->server_fd, 128); sock_set_nonblock(partition->server_fd, true); // 添加到Reactor reactor_add(partition->reactor, partition->server_fd, REACTOR_EVENT_READ, partition->config.handler, NULL); // 运行事件循环 while (1) { reactor_run(partition->reactor, 1000); } return NULL; } ​ /** * @brief 启动端口分区服务器 */ int start_port_partition_server(void) { // 定义端口分区配置 port_config_t configs[] = { {8080, "HTTP", http_handler}, {8081, "WebSocket", websocket_handler}, {8082, "API", http_handler}, // REST API {8083, "Admin", http_handler} // 管理接口 }; int config_count = sizeof(configs) / sizeof(configs[0]); port_partition_t* partitions = common_calloc(config_count, sizeof(port_partition_t)); // 启动每个端口分区 for (int i = 0; i < config_count; i++) { partitions[i].config = configs[i]; if (pthread_create(&partitions[i].thread, NULL, port_partition_thread, &partitions[i]) != 0) { perror("pthread_create"); return -1; } printf("Started %s server on port %d\n", configs[i].name, configs[i].port); } // 等待所有线程 for (int i = 0; i < config_count; i++) { pthread_join(partitions[i].thread, NULL); } common_free(partitions); return 0; }

第三部分:按片区管理的具体实现

3.1 基于地理位置的片区管理

/** * @file geo_partition.c * @brief 基于地理位置的片区管理 * * 根据不同地区用户分配到不同的服务器组 */ ​ #include <netinet/in.h> #include <arpa/inet.h> ​ /* 地理片区定义 */ typedef enum { REGION_ASIA = 0, REGION_EUROPE, REGION_AMERICA, REGION_COUNT } geo_region_t; ​ /* 片区配置 */ typedef struct { const char* cidr; // IP段 CIDR表示法 geo_region_t region; } ip_region_t; ​ /* IP到片区的映射配置 */ static ip_region_t ip_regions[] = { {"1.0.0.0/8", REGION_ASIA}, // 亚太 {"14.0.0.0/8", REGION_ASIA}, {"27.0.0.0/8", REGION_ASIA}, {"36.0.0.0/8", REGION_ASIA}, {"49.0.0.0/8", REGION_ASIA}, {"2.0.0.0/8", REGION_EUROPE}, // 欧洲 {"5.0.0.0/8", REGION_EUROPE}, {"31.0.0.0/8", REGION_EUROPE}, {"37.0.0.0/8", REGION_EUROPE}, {"46.0.0.0/8", REGION_EUROPE}, {"62.0.0.0/8", REGION_EUROPE}, {"77.0.0.0/8", REGION_EUROPE}, {"78.0.0.0/8", REGION_EUROPE}, {"79.0.0.0/8", REGION_EUROPE}, {"80.0.0.0/8", REGION_EUROPE}, {"81.0.0.0/8", REGION_EUROPE}, {"82.0.0.0/8", REGION_EUROPE}, {"83.0.0.0/8", REGION_EUROPE}, {"84.0.0.0/8", REGION_EUROPE}, {"85.0.0.0/8", REGION_EUROPE}, {"86.0.0.0/8", REGION_EUROPE}, {"87.0.0.0/8", REGION_EUROPE}, {"88.0.0.0/8", REGION_EUROPE}, {"89.0.0.0/8", REGION_EUROPE}, {"90.0.0.0/8", REGION_EUROPE}, {"91.0.0.0/8", REGION_EUROPE}, {"3.0.0.0/8", REGION_AMERICA}, // 美洲 {"4.0.0.0/8", REGION_AMERICA}, {"6.0.0.0/8", REGION_AMERICA}, {"7.0.0.0/8", REGION_AMERICA}, {"8.0.0.0/8", REGION_AMERICA}, {"9.0.0.0/8", REGION_AMERICA}, {"11.0.0.0/8", REGION_AMERICA}, {"12.0.0.0/8", REGION_AMERICA}, {"13.0.0.0/8", REGION_AMERICA}, {"15.0.0.0/8", REGION_AMERICA}, {"16.0.0.0/8", REGION_AMERICA}, {"17.0.0.0/8", REGION_AMERICA}, {"18.0.0.0/8", REGION_AMERICA}, {"19.0.0.0/8", REGION_AMERICA}, {"20.0.0.0/8", REGION_AMERICA}, {"21.0.0.0/8", REGION_AMERICA}, {"22.0.0.0/8", REGION_AMERICA}, {"23.0.0.0/8", REGION_AMERICA}, {"24.0.0.0/8", REGION_AMERICA}, {"25.0.0.0/8", REGION_AMERICA}, {"26.0.0.0/8", REGION_AMERICA}, {"28.0.0.0/8", REGION_AMERICA}, {"29.0.0.0/8", REGION_AMERICA}, {"30.0.0.0/8", REGION_AMERICA}, {"32.0.0.0/8", REGION_AMERICA}, {"33.0.0.0/8", REGION_AMERICA}, {"34.0.0.0/8", REGION_AMERICA}, {"35.0.0.0/8", REGION_AMERICA}, {"38.0.0.0/8", REGION_AMERICA}, {"40.0.0.0/8", REGION_AMERICA}, {"44.0.0.0/8", REGION_AMERICA}, {"45.0.0.0/8", REGION_AMERICA}, {"47.0.0.0/8", REGION_AMERICA}, {"48.0.0.0/8", REGION_AMERICA}, {"50.0.0.0/8", REGION_AMERICA}, {"52.0.0.0/8", REGION_AMERICA}, {"54.0.0.0/8", REGION_AMERICA}, {"56.0.0.0/8", REGION_AMERICA}, {"63.0.0.0/8", REGION_AMERICA}, {"64.0.0.0/8", REGION_AMERICA}, {"65.0.0.0/8", REGION_AMERICA}, {"66.0.0.0/8", REGION_AMERICA}, {"67.0.0.0/8", REGION_AMERICA}, {"68.0.0.0/8", REGION_AMERICA}, {"69.0.0.0/8", REGION_AMERICA}, {"70.0.0.0/8", REGION_AMERICA}, {"71.0.0.0/8", REGION_AMERICA}, {"72.0.0.0/8", REGION_AMERICA}, {"73.0.0.0/8", REGION_AMERICA}, {"74.0.0.0/8", REGION_AMERICA}, {"75.0.0.0/8", REGION_AMERICA}, {"76.0.0.0/8", REGION_AMERICA}, }; ​ /** * @brief 检查IP是否在CIDR范围内 */ static bool ip_in_cidr(uint32_t ip, const char* cidr) { struct in_addr network, mask; char network_str[INET_ADDRSTRLEN]; int prefix_len; // 解析CIDR "1.0.0.0/8" if (sscanf(cidr, "%[^/]/%d", network_str, &prefix_len) != 2) { return false; } if (inet_pton(AF_INET, network_str, &network) != 1) { return false; } // 计算掩码 uint32_t mask_value = prefix_len ? (~0 << (32 - prefix_len)) : 0; mask.s_addr = htonl(mask_value); // 检查IP是否在范围内 return (ip & mask.s_addr) == (network.s_addr & mask.s_addr); } ​ /** * @brief 根据客户端IP确定片区 */ static geo_region_t get_client_region(const char* client_ip) { struct in_addr addr; if (inet_pton(AF_INET, client_ip, &addr) != 1) { return REGION_ASIA; // 默认 } uint32_t ip = addr.s_addr; int region_count = sizeof(ip_regions) / sizeof(ip_regions[0]); for (int i = 0; i < region_count; i++) { if (ip_in_cidr(ip, ip_regions[i].cidr)) { return ip_regions[i].region; } } return REGION_ASIA; // 默认 } ​ /** * @brief 片区感知的accept处理 */ static void on_geo_accept(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { if (events & REACTOR_EVENT_READ) { sock_addr_t client_addr; socket_t client_fd = sock_accept(fd, &client_addr, 0); if (client_fd != SOCKET_INVALID) { // 获取客户端IP char ip_str[INET_ADDRSTRLEN]; sock_addr_str(&client_addr, ip_str, sizeof(ip_str)); // 确定片区 geo_region_t region = get_client_region(ip_str); // 根据片区分配到不同的工作队列 assign_to_region_queue(client_fd, region, ip_str); printf("Client %s assigned to region %d\n", ip_str, region); } } }

3.2 基于业务类型的片区管理

/** * @file business_partition.c * @brief 基于业务类型的片区管理 * * 不同业务类型使用不同的监听fd和处理逻辑 */ ​ /* 业务类型定义 */ typedef enum { BUSINESS_WEB = 0, // Web请求 BUSINESS_API, // API调用 BUSINESS_WS, // WebSocket BUSINESS_STREAM, // 流媒体 BUSINESS_BATCH, // 批量处理 BUSINESS_COUNT } business_type_t; ​ /* 业务片区配置 */ typedef struct { uint16_t base_port; // 基础端口 int max_connections; // 最大连接数 int timeout_ms; // 超时时间 const char* name; // 业务名称 reactor_callback_t handler; // 业务处理器 } business_partition_t; ​ /* 业务片区全局配置 */ static business_partition_t business_configs[BUSINESS_COUNT] = { [BUSINESS_WEB] = { .base_port = 8000, .max_connections = 10000, .timeout_ms = 30000, .name = "Web", .handler = web_request_handler }, [BUSINESS_API] = { .base_port = 8100, .max_connections = 5000, .timeout_ms = 10000, .name = "API", .handler = api_request_handler }, [BUSINESS_WS] = { .base_port = 8200, .max_connections = 2000, .timeout_ms = 0, // 长连接,无超时 .name = "WebSocket", .handler = websocket_handler }, [BUSINESS_STREAM] = { .base_port = 8300, .max_connections = 1000, .timeout_ms = 60000, .name = "Stream", .handler = stream_handler }, [BUSINESS_BATCH] = { .base_port = 8400, .max_connections = 100, .timeout_ms = 300000, // 5分钟 .name = "Batch", .handler = batch_handler } }; ​ /** * @brief 业务片区服务器 */ typedef struct { business_type_t type; reactor_t reactor; socket_t server_fd; int current_connections; pthread_t thread; common_mutex_t lock; } business_server_t; ​ /** * @brief 业务片区线程函数 */ static void* business_server_thread(void* arg) { business_server_t* server = (business_server_t*)arg; business_partition_t* config = &business_configs[server->type]; printf("Starting %s business server on port %d\n", config->name, config->base_port); // 创建Reactor server->reactor = reactor_create(1024, REACTOR_TRIGGER_EDGE); // 创建服务器socket server->server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(server->server_fd, true); sock_set_reuseport(server->server_fd, true); // 绑定到业务端口 sock_addr_t addr; sock_addr_ipv4("0.0.0.0", config->base_port, &addr); sock_bind(server->server_fd, &addr); sock_listen(server->server_fd, 128); sock_set_nonblock(server->server_fd, true); // 添加到Reactor reactor_add(server->reactor, server->server_fd, REACTOR_EVENT_READ, config->handler, server); // 运行事件循环 while (1) { reactor_run(server->reactor, 1000); // 检查连接数限制 common_mutex_lock(&server->lock); if (server->current_connections >= config->max_connections) { LOG_WARN("Business %s reached max connections: %d/%d", config->name, server->current_connections, config->max_connections); } common_mutex_unlock(&server->lock); } return NULL; } ​ /** * @brief 启动所有业务片区服务器 */ int start_business_partition_servers(void) { business_server_t* servers = common_calloc(BUSINESS_COUNT, sizeof(business_server_t)); for (int i = 0; i < BUSINESS_COUNT; i++) { servers[i].type = i; servers[i].current_connections = 0; servers[i].lock = common_mutex_create(); if (pthread_create(&servers[i].thread, NULL, business_server_thread, &servers[i]) != 0) { perror("pthread_create"); return -1; } } printf("All business partition servers started\n"); // 等待所有线程 for (int i = 0; i < BUSINESS_COUNT; i++) { pthread_join(servers[i].thread, NULL); } common_free(servers); return 0; }

第四部分:多监听fd的负载均衡策略

4.1 负载均衡器实现

/** * @file load_balancer.c * @brief 多监听fd负载均衡器 * * 在前端使用负载均衡器分发连接到后端多个监听fd */ ​ #include "socket.h" #include "reactor.h" #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> ​ #define BACKEND_COUNT 4 ​ /* 后端服务器信息 */ typedef struct { const char* ip; uint16_t port; int current_connections; int weight; // 权重 bool healthy; } backend_server_t; ​ /* 负载均衡器 */ typedef struct { reactor_t reactor; socket_t frontend_fd; // 前端监听fd backend_server_t backends[BACKEND_COUNT]; int backend_count; // 负载均衡算法 enum { LB_ROUND_ROBIN, // 轮询 LB_LEAST_CONN, // 最少连接 LB_WEIGHTED, // 加权轮询 LB_IP_HASH // IP哈希 } algorithm; int next_backend; // 用于轮询 common_mutex_t lock; } load_balancer_t; ​ /** * @brief 选择后端服务器(轮询算法) */ static backend_server_t* select_backend_round_robin(load_balancer_t* lb) { common_mutex_lock(&lb->lock); int start = lb->next_backend; int current = start; do { backend_server_t* backend = &lb->backends[current]; if (backend->healthy) { lb->next_backend = (current + 1) % lb->backend_count; common_mutex_unlock(&lb->lock); return backend; } current = (current + 1) % lb->backend_count; } while (current != start); common_mutex_unlock(&lb->lock); return NULL; // 没有健康的后端 } ​ /** * @brief 选择后端服务器(最少连接算法) */ static backend_server_t* select_backend_least_conn(load_balancer_t* lb) { common_mutex_lock(&lb->lock); backend_server_t* selected = NULL; int min_connections = INT_MAX; for (int i = 0; i < lb->backend_count; i++) { backend_server_t* backend = &lb->backends[i]; if (backend->healthy && backend->current_connections < min_connections) { min_connections = backend->current_connections; selected = backend; } } if (selected) { selected->current_connections++; } common_mutex_unlock(&lb->lock); return selected; } ​ /** * @brief 健康检查定时器 */ static void health_check_timer(reactor_t reactor, int timer_id, void* user_data) { load_balancer_t* lb = (load_balancer_t*)user_data; for (int i = 0; i < lb->backend_count; i++) { backend_server_t* backend = &lb->backends[i]; // 尝试连接到后端 socket_t test_sock = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); if (test_sock == SOCKET_INVALID) { continue; } sock_addr_t addr; sock_addr_ipv4(backend->ip, backend->port, &addr); // 设置超时 sock_set_timeout(test_sock, 1000, 1000); bool was_healthy = backend->healthy; backend->healthy = (sock_connect(test_sock, &addr, 1000) == 0); sock_close(test_sock); if (was_healthy != backend->healthy) { printf("Backend %s:%d %s\n", backend->ip, backend->port, backend->healthy ? "became healthy" : "became unhealthy"); } } } ​ /** * @brief 前端accept处理 */ static void on_frontend_accept(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { load_balancer_t* lb = (load_balancer_t*)user_data; if (events & REACTOR_EVENT_READ) { // 接受客户端连接 sock_addr_t client_addr; socket_t client_fd = sock_accept(fd, &client_addr, 0); if (client_fd != SOCKET_INVALID) { // 选择后端服务器 backend_server_t* backend = NULL; switch (lb->algorithm) { case LB_ROUND_ROBIN: backend = select_backend_round_robin(lb); break; case LB_LEAST_CONN: backend = select_backend_least_conn(lb); break; // 其他算法... } if (backend) { // 连接到后端服务器 socket_t backend_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); if (backend_fd != SOCKET_INVALID) { sock_addr_t backend_addr; sock_addr_ipv4(backend->ip, backend->port, &backend_addr); if (sock_connect(backend_fd, &backend_addr, 1000) == 0) { // 连接成功,开始代理 start_proxy_connection(client_fd, backend_fd, backend); } else { sock_close(backend_fd); sock_close(client_fd); backend->healthy = false; } } else { sock_close(client_fd); } } else { // 没有可用后端,关闭客户端连接 sock_close(client_fd); printf("No healthy backend available\n"); } } } } ​ /** * @brief 启动负载均衡器 */ int start_load_balancer(uint16_t frontend_port) { load_balancer_t* lb = common_calloc(1, sizeof(load_balancer_t)); // 配置后端服务器 lb->backends[0] = (backend_server_t){"192.168.1.101", 8080, 0, 1, true}; lb->backends[1] = (backend_server_t){"192.168.1.102", 8080, 0, 1, true}; lb->backends[2] = (backend_server_t){"192.168.1.103", 8080, 0, 2, true}; // 权重2 lb->backends[3] = (backend_server_t){"192.168.1.104", 8080, 0, 1, true}; lb->backend_count = 4; lb->algorithm = LB_WEIGHTED; lb->next_backend = 0; lb->lock = common_mutex_create(); // 创建前端监听socket lb->frontend_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(lb->frontend_fd, true); sock_addr_t addr; sock_addr_ipv4("0.0.0.0", frontend_port, &addr); sock_bind(lb->frontend_fd, &addr); sock_listen(lb->frontend_fd, 128); sock_set_nonblock(lb->frontend_fd, true); // 创建Reactor lb->reactor = reactor_create(1024, REACTOR_TRIGGER_LEVEL); // 添加前端socket到Reactor reactor_add(lb->reactor, lb->frontend_fd, REACTOR_EVENT_READ, on_frontend_accept, lb); // 添加健康检查定时器 reactor_add_timer(lb->reactor, 5000, health_check_timer, lb, true); printf("Load balancer started on port %d\n", frontend_port); printf("Backends: 4 servers with weighted round-robin\n"); // 运行事件循环 while (1) { reactor_run(lb->reactor, 1000); } return 0; }

第五部分:选择策略总结

什么时候使用多监听fd?

场景解决方案优点缺点
CPU多核利用SO_REUSEPORT + 多进程内核级负载均衡,性能最好Linux 3.9+ 要求
业务隔离端口分区业务完全隔离,易于管理需要客户端知道不同端口
地理位置优化基于IP的片区分配优化延迟,符合合规要求需要维护IP库
负载均衡前端负载均衡器灵活,可动态调整单点故障风险
协议分离不同协议不同端口协议处理优化客户端复杂度增加

推荐的最佳实践

// 最佳实践:混合模式 // 1. 使用SO_REUSEPORT利用多核 // 2. 主从Reactor分离accept和IO // 3. 按业务类型分区 // 4. 前端负载均衡器容灾 ​ int main() { // 第1层:负载均衡器(可选,用于跨机器) start_load_balancer(80); // 第2层:业务分区(按端口) start_business_partition_servers(); // 第3层:CPU多核优化(SO_REUSEPORT) start_reuseport_workers(); // 第4层:主从Reactor(单进程内优化) init_master_slave_server(); return 0; }

具体选择指南

  1. 连接数 < 1000:单监听fd + 单Reactor足够

  2. 连接数 1000-10000:SO_REUSEPORT多进程

  3. 连接数 > 10000:负载均衡器 + 多监听fd集群

  4. 业务复杂:按业务类型端口分区

  5. 全球部署:基于地理位置的片区管理

  6. 协议多样:不同协议不同监听端口

核心原则:根据业务需求性能要求运维复杂度的平衡来选择。不要过度设计,但要为增长留出扩展空间。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/29 17:15:32

揭秘Symfony 8微服务间通信难题:如何实现高可用与低延迟?

第一章&#xff1a;Symfony 8微服务通信的演进与核心挑战随着分布式架构在现代Web应用中的普及&#xff0c;Symfony 8在微服务通信领域展现出显著的技术演进。框架通过增强对HTTP/2、异步消息队列和API网关的支持&#xff0c;提升了服务间通信的效率与可靠性。开发者不再局限于…

作者头像 李华
网站建设 2026/4/2 1:58:38

算法 --- hash

哈希表简介 什么是 hash 表&#xff1f;hash表就是存储数据的容器 作用&#xff1a;快速查找某个元素 什么时候使用hash表&#xff1f;频繁查找某个数时&#xff0c;可以使用 hash 表 如何使用hash表&#xff1f;1.使用hash表容器&#xff1b;2.使用数组模拟简易hash表 什么时候…

作者头像 李华
网站建设 2026/4/1 19:59:52

【量子计算模拟实战指南】:从零搭建量子电路仿真环境的5大核心步骤

第一章&#xff1a;量子计算的模拟在探索量子计算的过程中&#xff0c;模拟器是理解量子行为和验证算法逻辑的重要工具。由于当前量子硬件仍处于发展初期&#xff0c;大多数研究人员依赖经典计算机上的量子模拟器来运行和调试量子程序。量子模拟的基本原理 量子模拟器通过经典计…

作者头像 李华
网站建设 2026/4/1 23:36:49

【Windows】从守护到终结:解析一个 Java 服务的优雅停止脚本

在微服务架构中&#xff0c;服务的优雅停止与安全启动同等重要。今天我们将深入剖析一个名为 stop.bat 的 Windows 批处理脚本&#xff0c;它展示了如何安全、全面地停止一个名为 "demo" 的 Java 服务。脚本设计理念这个停止脚本体现了"防御式编程"的思想&…

作者头像 李华
网站建设 2026/3/20 8:34:49

【空间转录组细胞聚类全攻略】:掌握R语言高效聚类算法与实战技巧

第一章&#xff1a;空间转录组细胞聚类概述空间转录组技术结合了传统转录组测序的基因表达分析能力与组织空间位置信息&#xff0c;为解析组织微环境中的细胞异质性提供了全新视角。在该技术框架下&#xff0c;细胞聚类成为识别不同细胞类型及其空间分布模式的核心步骤。通过将…

作者头像 李华
网站建设 2026/4/1 2:55:21

为什么你的应用越跑越慢?内存碎片正在悄悄吞噬资源

第一章&#xff1a;为什么你的应用越跑越慢&#xff1f;内存碎片正在悄悄吞噬资源在长期运行的应用中&#xff0c;性能逐渐下降是一个常见却容易被忽视的问题。尽管代码逻辑没有变化&#xff0c;系统资源监控也未报警&#xff0c;但响应时间变长、GC频率升高、内存占用持续增长…

作者头像 李华