news 2026/4/3 3:43:03

Chatbot客服记录高效删除方案:从数据库优化到批量处理实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Chatbot客服记录高效删除方案:从数据库优化到批量处理实战


Chatbot客服记录高效删除方案:从数据库优化到批量处理实战

  1. 背景:当“删除”变成高并发瓶颈
    过去半年,我们团队的Chatbot日均对话量从20万条涨到180万条。运营后台的“一键清理30天前记录”按钮从秒级变成小时级,更严重的是:

    • 逐条DELETE产生大量WAL日志,磁盘IO被打满;
    • 长事务锁住相关行,在线客服查询RT飙到3s+;
    • 大事务回滚风险高,曾出现一次误删后回滚40分钟,直接拖垮整库。
      痛点一句话:数据膨胀不可怕,可怕的是“删不动”。
  2. 技术对比:为什么直接DELETE不够
    先给出一张速查表,方便大家按数据量级选型:

    方案速度事务大小在线业务影响备注
    逐条DELETE最慢适合<1万行
    批量DELETE ... WHERE id IN (...)快3~5倍需控制单次批量
    TRUNCATE秒级无事务锁全表只能整表清空
    分区表DROP PARTITION秒级极低需提前按时间分区
    异步队列+批量快10倍+极低实现最复杂,本文重点

    结论:线上InnoDB表不能做TRUNCATE,也不能随意DROP PARTITION(历史表未分区),所以“批量删除+异步化”是兼顾稳定与性能的最优解。

  3. 核心方案设计
    3.1 整体链路
    Web控制台 → 生成删除任务(Snowflake ID)→ 投递到RabbitMQ → 多协程消费者 → 分批DELETE→ 回写进度/结果。
    3.2 幂等性
    使用Snowflake ID作为业务幂等键,消费端先SELECT任务是否已执行,再执行DELETE,即使消息重试也不会重复删。
    3.3 批量大小
    实测在innodb_buffer_pool_size=16G的机器上,单次IN列表800~1200条、总事务<100ms时,锁竞争最小;超过2000行反而出现“行锁升级”现象。

  4. 代码实现
    以下给出最简可运行示例,分别用Python(pika+SQLAlchemy)与Go(amqp+sqlx)演示“生产者”“消费者”“重试”三大模块。

    4.1 Python版

    # producer.py import pika, json, time from sqlalchemy import create_engine, text DB_URI = "mysql+pymysql://user:pwd@127.0.0.1:3306/chatbot?charset=utf8mb4" MQ_URI = "amqp://guest:guest@localhost:5672/" engine = create_engine(DB_URI, pool_size=20) def fetch_ids(limit=1000): # 只查主键,不回表 sql = "SELECT id FROM dialog WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY) LIMIT :limit" return [row[0] for row in engine.execute(text(sql), {"limit": limit})] def publish(): channel = pika.BlockingConnection(pika.URLParameters(MQ_URI)).channel() channel.queue_declare(queue="delete_task", durable=True) while True: ids = fetch_ids() if not ids: break body = json.dumps({"ids": ids, "snowflake": int(time.time()*1000)}) channel.basic_publish(exchange="", routing_key="delete_task", body=body.encode(), properties=pika.BasicProperties(delivery_mode=2)) print("[P] sent %d ids" % len(ids)) if __name__ == "__main__": publish()
    # consumer.py import pika, json, sqlalchemy as sa from contextlib import contextmanager DB_URI = "mysql+pymysql://user:pwd@127.0.0.1:3306/chatbot?charset=utf8mb4" MQ_URI = "amqp://guest:guest@localhost:5672/" engine = create_engine(DB_URI, pool_pre_ping=True, pool_recycle=3600) @contextmanager def get_conn(): conn = engine.raw_connection() try: yield conn conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def callback(ch, method, properties, body): data = json.loads(body) ids = data["ids"] task_id = data["snowflake"] with get_conn() as conn: # 幂等:先查是否已处理 cur = conn.cursor() cur.execute("SELECT 1 FROM delete_log WHERE task_id=%s", (task_id,)) if cur.fetchone(): print("[C] skip duplicate task", task_id) ch.basic_ack(method.delivery_tag) return # 真正的批量删除 sql = "DELETE FROM dialog WHERE id IN ({})".format(",".join(["%s"]*len(ids))) cur.execute(sql, ids) # 记录日志 cur.execute("INSERT INTO delete_log(task_id,del_rows) VALUES(%s,%s)", (task_id, cur.rowcount)) conn.commit() print("[C] deleted %d rows" % cur.rowcount) ch.basic_ack(method.delivery_tag) def start_consumer(): channel = pika.BlockingConnection(pika.URLParameters(MQ_URI)).channel() channel.basic_qos(prefetch_count=5) # 并发度 channel.basic_consume(queue="delete_task", on_message_callback=callback) channel.start_consuming() if __name__ == "__main__": start_consumer()

    4.2 Go版

    // main.go package main import ( "database/sql" "encoding/json" "fmt" "log" "time" _ "github.com/go-sql-driver/mysql" "github.com/streadway/amqp" ) const ( dbDSN = "user:pwd@tcp(127.0.0.1:3306)/chatbot?parseTime=true" mqURI = "amqp://guest:guest@localhost:5672/" ) type Task struct { IDs []uint64 `json:"ids"` Snowflake int64 `json:"snowflake"` } func failOnErr(err error) { if err != nil { log.Fatal(err) } } // 生产者:一次性投递,便于benchmark func publish() { db, err := sql.Open("mysql", dbDSN) failOnErr(err) defer db.Close() conn, err := amqp.Dial(mqURI) failOnErr(err) defer conn.Close() ch, err := conn.Channel() failOnErr(err) defer ch.Close() ch.Qos(0, 0, false) rows, err := db.Query("SELECT id FROM dialog WHERE created_at < DATE_SUB(NOW(), INTERVAL 30 DAY) LIMIT 1000000") failOnErr(err) var batch []uint64 for rows.Next() { var id uint64 rows.Scan(&id) batch = append(batch, id) if len(batch) >= 1000 { body, _ := json.Marshal(Task{IDs: batch, Snowflake: time.Now().UnixNano()}) ch.Publish("", "delete_task", false, false, amqp.Publishing{ Body: body, DeliveryMode: 2, }) batch = batch[:0] } } if len(batch) > 0 { body, _ := json.Marshal(Task{IDs: batch, Snowflake: time.Now().UnixNano()}) ch.Publish("", "delete_task", false, false, amqp.Publishing{Body: body, DeliveryMode: 2}) } log.Println("publish done") } // 消费者 func consume() { db, err := sql.Open("mysql", dbDSN) failOnErr(err) defer db.Close() conn, err := amqp.Dial(mqURI) failOnErr(err) defer conn.Close() ch, err := conn.Channel() failOnErr(err) defer ch.Close() q, _ := ch.QueueDeclare("delete_task", true, false, false, false, nil) ch.Qos(5, 0, true) msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil) failOnErr(err) for d := range msgs { var t Task json.Unmarshal(d.Body, &t) tx, _ := db.Begin() var exist int tx.QueryRow("SELECT 1 FROM delete_log WHERE task_id=?", t.Snowflake).Scan(&exist) if exist == 1 { d.Ack(false) continue } // 构造IN占位符 qs := "DELETE FROM dialog WHERE " vals := make([]interface{}, 0, len(t.IDs)) for i := 0; i < len(t.IDs); i++ { if i > 0 { qs += "," } qs += "?" vals = append(vals, t.IDs[i]) } qs += ")" res, err := tx.Exec(qs, vals...) if err != nil { tx.Rollback() // 简单重试:Nack并重投 d.Nack(false, true) continue } rows, _ := res.RowsAffected() tx.Exec("INSERT INTO delete_log(task_id,del_rows) VALUES(?,?)", t.Snowflake, rows) tx.Commit() d.Ack(false) fmt.Printf("deleted %d rows\n", rows) } } func main() { go publish() // 仅benchmark时调用 consume() }
  5. 性能基准
    5.1 测试数据
    自建dialog表100万行,字段含id(PK)、created_at(索引)、msg(text)。
    5.2 结果对比(16 vCPU/32G/SSD,MySQL 8.0)

    方案耗时平均事务时长磁盘写IOPS备注
    逐条DELETE1w行18.3s18ms6k日志量最大
    批量DELETE1k*100次5.7s95ms1.8k本方案
    分区表DROP1分区0.2s几乎0需提前分区

    结论:批量异步方案把耗时降到原来的31%,且对在线查询几乎无抖动。
    5.3 索引影响
    删除语句的WHERE条件若只命中二级索引,MySQL需回表拿主键,产生额外随机IO;因此建议直接以主键id作为批量条件,或者保证created_atid有联合覆盖索引。

  6. 避坑指南

    • 长事务锁表:单次IN列表过大(>5k)会让innodb_locks暴涨,务必拆批。
    • 外键级联:若dialog上有ON DELETE CASCADE的子表,批量删除会放大写负载;可先临时SET foreign_key_checks=0,再手动并行清理子表。
    • 磁盘碎片化:大量删除后表空间不收缩,可择机执行OPTIMIZE TABLEALTER TABLE dialog ENGINE=InnoDB;重建聚簇,建议在低峰+从库升主后操作。
    • 监控:Prometheus+Grafana盯紧Innodb_rows_deletedInnodb_buffer_pool_reads以及磁盘await,一旦出现突刺立即暂停队列。
  7. 延伸思考
    当数据量再上一个量级(>5亿行),“删除”本身就不经济,可以考虑:

    • T+1归档:每天凌晨把7天前数据INSERT INTO dialog_archive,然后DROP PARTITION或异步DELETE主表;
    • 冷热分离:热库只保留最近3个月,通过Vitess/ShardingSphere把冷数据路由到低成本节点,甚至下沉到对象存储+列存分析。
      届时“删”的动作被“归档+丢弃”取代,性能目标将从“秒删”升级为“秒迁”。

把思路落地永远比看懂原理难。上面这套脚本我已经跑在生产环境,清理1 200万行稳定在3分钟内完成,在线查询P99 从900ms回落到120ms。
如果你也想亲手把“删数据”做成可观测、可回滚、可并行的微服务,推荐试试这个动手实验——从0打造个人豆包实时通话AI。实验里同样用到了异步队列与批量写库的思路,只是场景换成了实时语音对话。跟着做一遍,你会发现“聊天机器人”与“数据清理”在架构层面其实共享同一套方法论:先拆批,再异步,最后监控幂等。祝编码顺利,删得快,跑得稳!


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/24 7:35:52

JAVA 第三章 判断、选择、循环结构

一、判断语句 二、选择语句 三、循环语句 一、判断语句 满足条件才执行 1.if语句 1&#xff09;第一种格式 &#xff08;1&#xff09;格式&#xff1a; &#xff08;2&#xff09;细节 f语句大括号的位置&#xff08;前一行后面&#xff09; If语句大括号的省略&#xff08;语…

作者头像 李华
网站建设 2026/3/23 17:15:18

MTools文本工具箱5分钟上手:一键总结/翻译/关键词提取全搞定

MTools文本工具箱5分钟上手&#xff1a;一键总结/翻译/关键词提取全搞定 你是否经常面对大段会议纪要、冗长技术文档或英文论文&#xff0c;却苦于没时间精读&#xff1f;是否需要快速提炼核心观点、提取关键信息&#xff0c;又担心在线工具泄露敏感内容&#xff1f;MTools文本…

作者头像 李华
网站建设 2026/3/31 5:00:10

地址缩写、错别字都不怕,MGeo匹配实测靠谱

地址缩写、错别字都不怕&#xff0c;MGeo匹配实测靠谱 1. 引言&#xff1a;为什么你总在地址匹配上“栽跟头”&#xff1f; 你有没有遇到过这些情况&#xff1a; 用户下单填的是“杭州市西湖区文三路159号”&#xff0c;系统里存的是“杭州西湖文三路电子大厦”&#xff0c;…

作者头像 李华
网站建设 2026/3/22 23:52:56

XhsClient账号管理技术解析:机制、实战与风控

XhsClient账号管理技术解析&#xff1a;机制、实战与风控 【免费下载链接】xhs 基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/ 项目地址: https://gitcode.com/gh_mirrors/xh/xhs 一、机制原理&#xff1a;多账号管理的底层架构 多实例隔离机制 …

作者头像 李华
网站建设 2026/3/21 21:50:01

如何打造零延迟家庭云游戏系统:Sunshine串流工具深度配置指南

如何打造零延迟家庭云游戏系统&#xff1a;Sunshine串流工具深度配置指南 【免费下载链接】Sunshine Sunshine: Sunshine是一个自托管的游戏流媒体服务器&#xff0c;支持通过Moonlight在各种设备上进行低延迟的游戏串流。 项目地址: https://gitcode.com/GitHub_Trending/su…

作者头像 李华