第一部分:什么情况下需要多个监听fd?
1.1 单监听fd的局限性
// 单监听fd的典型架构 socket_t server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_bind(server_fd, "0.0.0.0", 8080); sock_listen(server_fd, 128); // 所有客户端都连接到同一个fd // 问题: // 1. 单一瓶颈点 // 2. 无法充分利用多核CPU // 3. 单个队列可能成为性能瓶颈 // 4. 无法实现连接隔离
1.2 需要多个监听fd的场景
| 场景 | 原因 | 解决方案 |
|---|---|---|
| CPU多核优化 | 单个进程无法利用多核 | 多监听fd + 多进程/线程 |
| 负载均衡 | 单个队列压力大 | 多队列负载均衡 |
| 协议分离 | 不同协议需要不同处理 | 不同端口监听 |
| 连接隔离 | 业务分区需求 | 按用户/区域分区 |
| 容错冗余 | 单点故障风险 | 多监听fd热备 |
| IPv4/IPv6双栈 | 同时支持两种协议 | 分别监听 |
第二部分:多监听fd的架构模式
2.1 SO_REUSEPORT模式(推荐)
/** * @file multi_listen_reuseport.c * @brief 使用SO_REUSEPORT的多监听fd * * Linux 3.9+ 支持,允许多个socket绑定到相同端口 * 内核自动进行负载均衡 */ #include "socket.h" #include "reactor.h" #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/wait.h> #define WORKER_COUNT 4 // 工作进程数 /** * @brief 工作进程函数 */ static void worker_process(int worker_id, uint16_t port) { printf("Worker %d started (PID: %d)\n", worker_id, getpid()); // 每个worker创建自己的Reactor reactor_t reactor = reactor_create(1024, REACTOR_TRIGGER_EDGE); // 创建监听socket(所有worker绑定相同端口) socket_t server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); // 关键:设置SO_REUSEPORT sock_set_reuseport(server_fd, true); sock_set_reuseaddr(server_fd, true); // 绑定到相同端口 sock_addr_t addr; sock_addr_ipv4("0.0.0.0", port, &addr); sock_bind(server_fd, &addr); // 开始监听 sock_listen(server_fd, 128); sock_set_nonblock(server_fd, true); // 添加到Reactor reactor_add(reactor, server_fd, REACTOR_EVENT_READ, on_accept, (void*)(long)worker_id); printf("Worker %d listening on port %d\n", worker_id, port); // 运行事件循环 while (1) { reactor_run(reactor, 1000); } } /** * @brief 主进程 */ int main(int argc, char* argv[]) { uint16_t port = 8080; printf("Master process (PID: %d) starting %d workers...\n", getpid(), WORKER_COUNT); // 创建多个工作进程 for (int i = 0; i < WORKER_COUNT; i++) { pid_t pid = fork(); if (pid == 0) { // 子进程(工作进程) worker_process(i, port); exit(0); } else if (pid < 0) { perror("fork"); exit(1); } printf("Started worker %d with PID: %d\n", i, pid); } // 主进程等待所有子进程 for (int i = 0; i < WORKER_COUNT; i++) { wait(NULL); } return 0; }SO_REUSEPORT的优势:
内核级负载均衡
连接均匀分配到不同进程
减少锁竞争
更好的CPU缓存局部性
2.2 主从Reactor模式
/** * @file master_slave_reactor.c * @brief 主从Reactor模式 * * 主Reactor只处理accept * 从Reactor处理IO */ #include "socket.h" #include "reactor.h" #include <pthread.h> #define SLAVE_COUNT 4 // 从Reactor数量 /* 从Reactor上下文 */ typedef struct { reactor_t reactor; pthread_t thread; int id; int event_count; } slave_reactor_t; /* 全局上下文 */ typedef struct { socket_t server_fd; reactor_t master_reactor; slave_reactor_t slaves[SLAVE_COUNT]; int next_slave; // 下一个分配的从Reactor common_mutex_t lock; } server_context_t; /** * @brief 从Reactor线程函数 */ static void* slave_thread_func(void* arg) { slave_reactor_t* slave = (slave_reactor_t*)arg; printf("Slave reactor %d started\n", slave->id); // 创建从Reactor slave->reactor = reactor_create(1024, REACTOR_TRIGGER_EDGE); // 运行事件循环 while (1) { int processed = reactor_run(slave->reactor, 1000); slave->event_count += processed; // 每10秒打印统计 static time_t last_print = 0; time_t now = time(NULL); if (now - last_print >= 10) { printf("Slave %d: processed %d events total\n", slave->id, slave->event_count); last_print = now; } } return NULL; } /** * @brief 接受连接并分配到从Reactor */ static void on_master_accept(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { server_context_t* ctx = (server_context_t*)user_data; if (events & REACTOR_EVENT_READ) { // 接受新连接 socket_t client_fd = sock_accept(ctx->server_fd, NULL, 0); if (client_fd == SOCKET_INVALID) { return; } sock_set_nonblock(client_fd, true); // 选择从Reactor(轮询) common_mutex_lock(&ctx->lock); int slave_id = ctx->next_slave; ctx->next_slave = (ctx->next_slave + 1) % SLAVE_COUNT; common_mutex_unlock(&ctx->lock); slave_reactor_t* slave = &ctx->slaves[slave_id]; // 创建连接上下文 client_context_t* client_ctx = common_calloc(1, sizeof(client_context_t)); client_ctx->sock = client_fd; client_ctx->slave_id = slave_id; // 添加到从Reactor(需要线程间通信) // 实际实现需要使用管道或eventfd通知从Reactor add_client_to_slave(slave, client_fd, client_ctx); printf("Accepted client fd=%d, assigned to slave %d\n", client_fd, slave_id); } } /** * @brief 初始化服务器 */ int init_master_slave_server(uint16_t port) { server_context_t* ctx = common_calloc(1, sizeof(server_context_t)); // 创建服务器socket ctx->server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(ctx->server_fd, true); sock_addr_t addr; sock_addr_ipv4("0.0.0.0", port, &addr); sock_bind(ctx->server_fd, &addr); sock_listen(ctx->server_fd, 128); sock_set_nonblock(ctx->server_fd, true); // 创建主Reactor ctx->master_reactor = reactor_create(1024, REACTOR_TRIGGER_LEVEL); // 添加到主Reactor reactor_add(ctx->master_reactor, ctx->server_fd, REACTOR_EVENT_READ, on_master_accept, ctx); // 初始化锁 ctx->lock = common_mutex_create(); ctx->next_slave = 0; // 启动从Reactor线程 for (int i = 0; i < SLAVE_COUNT; i++) { ctx->slaves[i].id = i; ctx->slaves[i].event_count = 0; if (pthread_create(&ctx->slaves[i].thread, NULL, slave_thread_func, &ctx->slaves[i]) != 0) { perror("pthread_create"); return -1; } } printf("Master-slave server started on port %d\n", port); printf("Master reactor handling accepts, %d slaves handling IO\n", SLAVE_COUNT); // 运行主Reactor while (1) { reactor_run(ctx->master_reactor, 1000); } return 0; }2.3 端口分区模式
/** * @file port_partition.c * @brief 端口分区模式 * * 不同业务使用不同端口,不同Reactor处理 * 例如:HTTP:8080, WebSocket:8081, API:8082 */ #include "socket.h" #include "reactor.h" #include <pthread.h> /* 端口配置 */ typedef struct { uint16_t port; const char* name; reactor_callback_t handler; } port_config_t; /* 端口分区服务器 */ typedef struct { reactor_t reactor; socket_t server_fd; port_config_t config; pthread_t thread; } port_partition_t; /** * @brief HTTP处理器 */ static void http_handler(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { if (events & REACTOR_EVENT_READ) { socket_t client_fd = sock_accept(fd, NULL, 0); if (client_fd != SOCKET_INVALID) { printf("HTTP connection accepted on fd=%d\n", client_fd); // 处理HTTP请求 handle_http_request(client_fd); sock_close(client_fd); } } } /** * @brief WebSocket处理器 */ static void websocket_handler(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { if (events & REACTOR_EVENT_READ) { socket_t client_fd = sock_accept(fd, NULL, 0); if (client_fd != SOCKET_INVALID) { printf("WebSocket connection accepted on fd=%d\n", client_fd); // WebSocket握手 if (websocket_handshake(client_fd)) { // 添加到Reactor进行长连接管理 reactor_add(reactor, client_fd, REACTOR_EVENT_READ, on_websocket_data, NULL); } else { sock_close(client_fd); } } } } /** * @brief 端口分区线程函数 */ static void* port_partition_thread(void* arg) { port_partition_t* partition = (port_partition_t*)arg; printf("Starting %s server on port %d\n", partition->config.name, partition->config.port); // 创建Reactor partition->reactor = reactor_create(1024, REACTOR_TRIGGER_LEVEL); // 创建服务器socket partition->server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(partition->server_fd, true); sock_addr_t addr; sock_addr_ipv4("0.0.0.0", partition->config.port, &addr); sock_bind(partition->server_fd, &addr); sock_listen(partition->server_fd, 128); sock_set_nonblock(partition->server_fd, true); // 添加到Reactor reactor_add(partition->reactor, partition->server_fd, REACTOR_EVENT_READ, partition->config.handler, NULL); // 运行事件循环 while (1) { reactor_run(partition->reactor, 1000); } return NULL; } /** * @brief 启动端口分区服务器 */ int start_port_partition_server(void) { // 定义端口分区配置 port_config_t configs[] = { {8080, "HTTP", http_handler}, {8081, "WebSocket", websocket_handler}, {8082, "API", http_handler}, // REST API {8083, "Admin", http_handler} // 管理接口 }; int config_count = sizeof(configs) / sizeof(configs[0]); port_partition_t* partitions = common_calloc(config_count, sizeof(port_partition_t)); // 启动每个端口分区 for (int i = 0; i < config_count; i++) { partitions[i].config = configs[i]; if (pthread_create(&partitions[i].thread, NULL, port_partition_thread, &partitions[i]) != 0) { perror("pthread_create"); return -1; } printf("Started %s server on port %d\n", configs[i].name, configs[i].port); } // 等待所有线程 for (int i = 0; i < config_count; i++) { pthread_join(partitions[i].thread, NULL); } common_free(partitions); return 0; }第三部分:按片区管理的具体实现
3.1 基于地理位置的片区管理
/** * @file geo_partition.c * @brief 基于地理位置的片区管理 * * 根据不同地区用户分配到不同的服务器组 */ #include <netinet/in.h> #include <arpa/inet.h> /* 地理片区定义 */ typedef enum { REGION_ASIA = 0, REGION_EUROPE, REGION_AMERICA, REGION_COUNT } geo_region_t; /* 片区配置 */ typedef struct { const char* cidr; // IP段 CIDR表示法 geo_region_t region; } ip_region_t; /* IP到片区的映射配置 */ static ip_region_t ip_regions[] = { {"1.0.0.0/8", REGION_ASIA}, // 亚太 {"14.0.0.0/8", REGION_ASIA}, {"27.0.0.0/8", REGION_ASIA}, {"36.0.0.0/8", REGION_ASIA}, {"49.0.0.0/8", REGION_ASIA}, {"2.0.0.0/8", REGION_EUROPE}, // 欧洲 {"5.0.0.0/8", REGION_EUROPE}, {"31.0.0.0/8", REGION_EUROPE}, {"37.0.0.0/8", REGION_EUROPE}, {"46.0.0.0/8", REGION_EUROPE}, {"62.0.0.0/8", REGION_EUROPE}, {"77.0.0.0/8", REGION_EUROPE}, {"78.0.0.0/8", REGION_EUROPE}, {"79.0.0.0/8", REGION_EUROPE}, {"80.0.0.0/8", REGION_EUROPE}, {"81.0.0.0/8", REGION_EUROPE}, {"82.0.0.0/8", REGION_EUROPE}, {"83.0.0.0/8", REGION_EUROPE}, {"84.0.0.0/8", REGION_EUROPE}, {"85.0.0.0/8", REGION_EUROPE}, {"86.0.0.0/8", REGION_EUROPE}, {"87.0.0.0/8", REGION_EUROPE}, {"88.0.0.0/8", REGION_EUROPE}, {"89.0.0.0/8", REGION_EUROPE}, {"90.0.0.0/8", REGION_EUROPE}, {"91.0.0.0/8", REGION_EUROPE}, {"3.0.0.0/8", REGION_AMERICA}, // 美洲 {"4.0.0.0/8", REGION_AMERICA}, {"6.0.0.0/8", REGION_AMERICA}, {"7.0.0.0/8", REGION_AMERICA}, {"8.0.0.0/8", REGION_AMERICA}, {"9.0.0.0/8", REGION_AMERICA}, {"11.0.0.0/8", REGION_AMERICA}, {"12.0.0.0/8", REGION_AMERICA}, {"13.0.0.0/8", REGION_AMERICA}, {"15.0.0.0/8", REGION_AMERICA}, {"16.0.0.0/8", REGION_AMERICA}, {"17.0.0.0/8", REGION_AMERICA}, {"18.0.0.0/8", REGION_AMERICA}, {"19.0.0.0/8", REGION_AMERICA}, {"20.0.0.0/8", REGION_AMERICA}, {"21.0.0.0/8", REGION_AMERICA}, {"22.0.0.0/8", REGION_AMERICA}, {"23.0.0.0/8", REGION_AMERICA}, {"24.0.0.0/8", REGION_AMERICA}, {"25.0.0.0/8", REGION_AMERICA}, {"26.0.0.0/8", REGION_AMERICA}, {"28.0.0.0/8", REGION_AMERICA}, {"29.0.0.0/8", REGION_AMERICA}, {"30.0.0.0/8", REGION_AMERICA}, {"32.0.0.0/8", REGION_AMERICA}, {"33.0.0.0/8", REGION_AMERICA}, {"34.0.0.0/8", REGION_AMERICA}, {"35.0.0.0/8", REGION_AMERICA}, {"38.0.0.0/8", REGION_AMERICA}, {"40.0.0.0/8", REGION_AMERICA}, {"44.0.0.0/8", REGION_AMERICA}, {"45.0.0.0/8", REGION_AMERICA}, {"47.0.0.0/8", REGION_AMERICA}, {"48.0.0.0/8", REGION_AMERICA}, {"50.0.0.0/8", REGION_AMERICA}, {"52.0.0.0/8", REGION_AMERICA}, {"54.0.0.0/8", REGION_AMERICA}, {"56.0.0.0/8", REGION_AMERICA}, {"63.0.0.0/8", REGION_AMERICA}, {"64.0.0.0/8", REGION_AMERICA}, {"65.0.0.0/8", REGION_AMERICA}, {"66.0.0.0/8", REGION_AMERICA}, {"67.0.0.0/8", REGION_AMERICA}, {"68.0.0.0/8", REGION_AMERICA}, {"69.0.0.0/8", REGION_AMERICA}, {"70.0.0.0/8", REGION_AMERICA}, {"71.0.0.0/8", REGION_AMERICA}, {"72.0.0.0/8", REGION_AMERICA}, {"73.0.0.0/8", REGION_AMERICA}, {"74.0.0.0/8", REGION_AMERICA}, {"75.0.0.0/8", REGION_AMERICA}, {"76.0.0.0/8", REGION_AMERICA}, }; /** * @brief 检查IP是否在CIDR范围内 */ static bool ip_in_cidr(uint32_t ip, const char* cidr) { struct in_addr network, mask; char network_str[INET_ADDRSTRLEN]; int prefix_len; // 解析CIDR "1.0.0.0/8" if (sscanf(cidr, "%[^/]/%d", network_str, &prefix_len) != 2) { return false; } if (inet_pton(AF_INET, network_str, &network) != 1) { return false; } // 计算掩码 uint32_t mask_value = prefix_len ? (~0 << (32 - prefix_len)) : 0; mask.s_addr = htonl(mask_value); // 检查IP是否在范围内 return (ip & mask.s_addr) == (network.s_addr & mask.s_addr); } /** * @brief 根据客户端IP确定片区 */ static geo_region_t get_client_region(const char* client_ip) { struct in_addr addr; if (inet_pton(AF_INET, client_ip, &addr) != 1) { return REGION_ASIA; // 默认 } uint32_t ip = addr.s_addr; int region_count = sizeof(ip_regions) / sizeof(ip_regions[0]); for (int i = 0; i < region_count; i++) { if (ip_in_cidr(ip, ip_regions[i].cidr)) { return ip_regions[i].region; } } return REGION_ASIA; // 默认 } /** * @brief 片区感知的accept处理 */ static void on_geo_accept(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { if (events & REACTOR_EVENT_READ) { sock_addr_t client_addr; socket_t client_fd = sock_accept(fd, &client_addr, 0); if (client_fd != SOCKET_INVALID) { // 获取客户端IP char ip_str[INET_ADDRSTRLEN]; sock_addr_str(&client_addr, ip_str, sizeof(ip_str)); // 确定片区 geo_region_t region = get_client_region(ip_str); // 根据片区分配到不同的工作队列 assign_to_region_queue(client_fd, region, ip_str); printf("Client %s assigned to region %d\n", ip_str, region); } } }3.2 基于业务类型的片区管理
/** * @file business_partition.c * @brief 基于业务类型的片区管理 * * 不同业务类型使用不同的监听fd和处理逻辑 */ /* 业务类型定义 */ typedef enum { BUSINESS_WEB = 0, // Web请求 BUSINESS_API, // API调用 BUSINESS_WS, // WebSocket BUSINESS_STREAM, // 流媒体 BUSINESS_BATCH, // 批量处理 BUSINESS_COUNT } business_type_t; /* 业务片区配置 */ typedef struct { uint16_t base_port; // 基础端口 int max_connections; // 最大连接数 int timeout_ms; // 超时时间 const char* name; // 业务名称 reactor_callback_t handler; // 业务处理器 } business_partition_t; /* 业务片区全局配置 */ static business_partition_t business_configs[BUSINESS_COUNT] = { [BUSINESS_WEB] = { .base_port = 8000, .max_connections = 10000, .timeout_ms = 30000, .name = "Web", .handler = web_request_handler }, [BUSINESS_API] = { .base_port = 8100, .max_connections = 5000, .timeout_ms = 10000, .name = "API", .handler = api_request_handler }, [BUSINESS_WS] = { .base_port = 8200, .max_connections = 2000, .timeout_ms = 0, // 长连接,无超时 .name = "WebSocket", .handler = websocket_handler }, [BUSINESS_STREAM] = { .base_port = 8300, .max_connections = 1000, .timeout_ms = 60000, .name = "Stream", .handler = stream_handler }, [BUSINESS_BATCH] = { .base_port = 8400, .max_connections = 100, .timeout_ms = 300000, // 5分钟 .name = "Batch", .handler = batch_handler } }; /** * @brief 业务片区服务器 */ typedef struct { business_type_t type; reactor_t reactor; socket_t server_fd; int current_connections; pthread_t thread; common_mutex_t lock; } business_server_t; /** * @brief 业务片区线程函数 */ static void* business_server_thread(void* arg) { business_server_t* server = (business_server_t*)arg; business_partition_t* config = &business_configs[server->type]; printf("Starting %s business server on port %d\n", config->name, config->base_port); // 创建Reactor server->reactor = reactor_create(1024, REACTOR_TRIGGER_EDGE); // 创建服务器socket server->server_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(server->server_fd, true); sock_set_reuseport(server->server_fd, true); // 绑定到业务端口 sock_addr_t addr; sock_addr_ipv4("0.0.0.0", config->base_port, &addr); sock_bind(server->server_fd, &addr); sock_listen(server->server_fd, 128); sock_set_nonblock(server->server_fd, true); // 添加到Reactor reactor_add(server->reactor, server->server_fd, REACTOR_EVENT_READ, config->handler, server); // 运行事件循环 while (1) { reactor_run(server->reactor, 1000); // 检查连接数限制 common_mutex_lock(&server->lock); if (server->current_connections >= config->max_connections) { LOG_WARN("Business %s reached max connections: %d/%d", config->name, server->current_connections, config->max_connections); } common_mutex_unlock(&server->lock); } return NULL; } /** * @brief 启动所有业务片区服务器 */ int start_business_partition_servers(void) { business_server_t* servers = common_calloc(BUSINESS_COUNT, sizeof(business_server_t)); for (int i = 0; i < BUSINESS_COUNT; i++) { servers[i].type = i; servers[i].current_connections = 0; servers[i].lock = common_mutex_create(); if (pthread_create(&servers[i].thread, NULL, business_server_thread, &servers[i]) != 0) { perror("pthread_create"); return -1; } } printf("All business partition servers started\n"); // 等待所有线程 for (int i = 0; i < BUSINESS_COUNT; i++) { pthread_join(servers[i].thread, NULL); } common_free(servers); return 0; }第四部分:多监听fd的负载均衡策略
4.1 负载均衡器实现
/** * @file load_balancer.c * @brief 多监听fd负载均衡器 * * 在前端使用负载均衡器分发连接到后端多个监听fd */ #include "socket.h" #include "reactor.h" #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #define BACKEND_COUNT 4 /* 后端服务器信息 */ typedef struct { const char* ip; uint16_t port; int current_connections; int weight; // 权重 bool healthy; } backend_server_t; /* 负载均衡器 */ typedef struct { reactor_t reactor; socket_t frontend_fd; // 前端监听fd backend_server_t backends[BACKEND_COUNT]; int backend_count; // 负载均衡算法 enum { LB_ROUND_ROBIN, // 轮询 LB_LEAST_CONN, // 最少连接 LB_WEIGHTED, // 加权轮询 LB_IP_HASH // IP哈希 } algorithm; int next_backend; // 用于轮询 common_mutex_t lock; } load_balancer_t; /** * @brief 选择后端服务器(轮询算法) */ static backend_server_t* select_backend_round_robin(load_balancer_t* lb) { common_mutex_lock(&lb->lock); int start = lb->next_backend; int current = start; do { backend_server_t* backend = &lb->backends[current]; if (backend->healthy) { lb->next_backend = (current + 1) % lb->backend_count; common_mutex_unlock(&lb->lock); return backend; } current = (current + 1) % lb->backend_count; } while (current != start); common_mutex_unlock(&lb->lock); return NULL; // 没有健康的后端 } /** * @brief 选择后端服务器(最少连接算法) */ static backend_server_t* select_backend_least_conn(load_balancer_t* lb) { common_mutex_lock(&lb->lock); backend_server_t* selected = NULL; int min_connections = INT_MAX; for (int i = 0; i < lb->backend_count; i++) { backend_server_t* backend = &lb->backends[i]; if (backend->healthy && backend->current_connections < min_connections) { min_connections = backend->current_connections; selected = backend; } } if (selected) { selected->current_connections++; } common_mutex_unlock(&lb->lock); return selected; } /** * @brief 健康检查定时器 */ static void health_check_timer(reactor_t reactor, int timer_id, void* user_data) { load_balancer_t* lb = (load_balancer_t*)user_data; for (int i = 0; i < lb->backend_count; i++) { backend_server_t* backend = &lb->backends[i]; // 尝试连接到后端 socket_t test_sock = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); if (test_sock == SOCKET_INVALID) { continue; } sock_addr_t addr; sock_addr_ipv4(backend->ip, backend->port, &addr); // 设置超时 sock_set_timeout(test_sock, 1000, 1000); bool was_healthy = backend->healthy; backend->healthy = (sock_connect(test_sock, &addr, 1000) == 0); sock_close(test_sock); if (was_healthy != backend->healthy) { printf("Backend %s:%d %s\n", backend->ip, backend->port, backend->healthy ? "became healthy" : "became unhealthy"); } } } /** * @brief 前端accept处理 */ static void on_frontend_accept(reactor_t reactor, int fd, reactor_event_t events, void* user_data) { load_balancer_t* lb = (load_balancer_t*)user_data; if (events & REACTOR_EVENT_READ) { // 接受客户端连接 sock_addr_t client_addr; socket_t client_fd = sock_accept(fd, &client_addr, 0); if (client_fd != SOCKET_INVALID) { // 选择后端服务器 backend_server_t* backend = NULL; switch (lb->algorithm) { case LB_ROUND_ROBIN: backend = select_backend_round_robin(lb); break; case LB_LEAST_CONN: backend = select_backend_least_conn(lb); break; // 其他算法... } if (backend) { // 连接到后端服务器 socket_t backend_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); if (backend_fd != SOCKET_INVALID) { sock_addr_t backend_addr; sock_addr_ipv4(backend->ip, backend->port, &backend_addr); if (sock_connect(backend_fd, &backend_addr, 1000) == 0) { // 连接成功,开始代理 start_proxy_connection(client_fd, backend_fd, backend); } else { sock_close(backend_fd); sock_close(client_fd); backend->healthy = false; } } else { sock_close(client_fd); } } else { // 没有可用后端,关闭客户端连接 sock_close(client_fd); printf("No healthy backend available\n"); } } } } /** * @brief 启动负载均衡器 */ int start_load_balancer(uint16_t frontend_port) { load_balancer_t* lb = common_calloc(1, sizeof(load_balancer_t)); // 配置后端服务器 lb->backends[0] = (backend_server_t){"192.168.1.101", 8080, 0, 1, true}; lb->backends[1] = (backend_server_t){"192.168.1.102", 8080, 0, 1, true}; lb->backends[2] = (backend_server_t){"192.168.1.103", 8080, 0, 2, true}; // 权重2 lb->backends[3] = (backend_server_t){"192.168.1.104", 8080, 0, 1, true}; lb->backend_count = 4; lb->algorithm = LB_WEIGHTED; lb->next_backend = 0; lb->lock = common_mutex_create(); // 创建前端监听socket lb->frontend_fd = sock_create(SOCK_AF_INET, SOCK_TYPE_STREAM); sock_set_reuseaddr(lb->frontend_fd, true); sock_addr_t addr; sock_addr_ipv4("0.0.0.0", frontend_port, &addr); sock_bind(lb->frontend_fd, &addr); sock_listen(lb->frontend_fd, 128); sock_set_nonblock(lb->frontend_fd, true); // 创建Reactor lb->reactor = reactor_create(1024, REACTOR_TRIGGER_LEVEL); // 添加前端socket到Reactor reactor_add(lb->reactor, lb->frontend_fd, REACTOR_EVENT_READ, on_frontend_accept, lb); // 添加健康检查定时器 reactor_add_timer(lb->reactor, 5000, health_check_timer, lb, true); printf("Load balancer started on port %d\n", frontend_port); printf("Backends: 4 servers with weighted round-robin\n"); // 运行事件循环 while (1) { reactor_run(lb->reactor, 1000); } return 0; }第五部分:选择策略总结
什么时候使用多监听fd?
| 场景 | 解决方案 | 优点 | 缺点 |
|---|---|---|---|
| CPU多核利用 | SO_REUSEPORT + 多进程 | 内核级负载均衡,性能最好 | Linux 3.9+ 要求 |
| 业务隔离 | 端口分区 | 业务完全隔离,易于管理 | 需要客户端知道不同端口 |
| 地理位置优化 | 基于IP的片区分配 | 优化延迟,符合合规要求 | 需要维护IP库 |
| 负载均衡 | 前端负载均衡器 | 灵活,可动态调整 | 单点故障风险 |
| 协议分离 | 不同协议不同端口 | 协议处理优化 | 客户端复杂度增加 |
推荐的最佳实践
// 最佳实践:混合模式 // 1. 使用SO_REUSEPORT利用多核 // 2. 主从Reactor分离accept和IO // 3. 按业务类型分区 // 4. 前端负载均衡器容灾 int main() { // 第1层:负载均衡器(可选,用于跨机器) start_load_balancer(80); // 第2层:业务分区(按端口) start_business_partition_servers(); // 第3层:CPU多核优化(SO_REUSEPORT) start_reuseport_workers(); // 第4层:主从Reactor(单进程内优化) init_master_slave_server(); return 0; }具体选择指南
连接数 < 1000:单监听fd + 单Reactor足够
连接数 1000-10000:SO_REUSEPORT多进程
连接数 > 10000:负载均衡器 + 多监听fd集群
业务复杂:按业务类型端口分区
全球部署:基于地理位置的片区管理
协议多样:不同协议不同监听端口
核心原则:根据业务需求、性能要求、运维复杂度的平衡来选择。不要过度设计,但要为增长留出扩展空间。