AI摘要

本文介绍了一种基于MySQL Binlog的闪回恢复方案,用于恢复因误操作而删除的数据。方案包括定位binlog文件、解析binlog、将DELETE事件转换为INSERT、导入恢复数据、验证数据和回写原库等步骤。同时,提供了Python脚本用于将binlog中的DELETE事件转换为INSERT语句,并讨论了方案的适用场景和局限性。

背景

2026-04-18 18:05:15,FSSE 数据中心系统通过 FsseDataVersionApiService.deleteMany() 接口删除了 7 个数据版本,触发级联删除,清空了终端表 ads_rpt_team_series(252,830 行)和 rpt_template_params(3,480 行)的对应数据。

根因分析

调用链路:

deleteMany API(传入 7 个版本 ID)
  → FsseDataVersionServiceImpl.deleteMany()
    → 遍历 FinalTableEnum 所有终端表
      → FinalTableServiceImpl.deleteFinalTableDataByVersionCode()
        → DELETE FROM tableName WHERE version_code IN (:versionCodes)
    → 更新任务记录状态为 NOT_SAVED
  → 删除版本记录本身

被删除的 7 个版本:坡头巴里坤洋浦赤坎_v1、柯坪县_v1、民丰县_v1、沙湾市_v2、泽普县_v2、泽普县_v3、泽普县_测试。


环境信息

项目
服务器192.168.3.166
MySQL 版本5.7(Docker 容器 mysql_5.7
数据卷/data/mysql_5.7/data/var/lib/mysql
认证方式mysql -h 127.0.0.1 -u root -pyide123456(必须用 TCP,socket 连接会认证失败)
binlog 格式ROW
原始数据库data_center_test
恢复数据库data_center_test_recover

恢复流程

第一步:定位 binlog 文件

-- 登录 MySQL 查看 binlog 列表
SHOW BINARY LOGS;

-- 查看目标时间点附近的事件(先找大致位置)
SHOW BINLOG EVENTS IN 'mysql-bin.000003' LIMIT 100;

第二步:解析 binlog

注意:该 Docker 镜像是精简版,容器内没有 mysqlbinlog 工具。需要在宿主机安装 MySQL 5.7 客户端。MariaDB 自带的 mysqlbinlog 与 MySQL 5.7 binlog 不兼容(报错 event_type: 35)。

安装 MySQL 5.7 客户端(CentOS):

# 添加 MySQL 官方 repo
rpm -Uvh https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm

# 安装(跳过 GPG 校验,官方 key 已过期)
yum install --nogpgcheck mysql-community-client -y

解析 binlog 为可读文本:

mysqlbinlog \
  --base64-output=DECODE-ROWS -v \
  --start-datetime="2026-04-18 18:04:00" \
  --stop-datetime="2026-04-18 18:07:00" \
  /data/mysql_5.7/data/mysql-bin.000003 \
  > /tmp/binlog_0418.txt

第三步:将 DELETE 转换为 INSERT(闪回)

ROW 格式的 binlog 中,DELETE 事件包含被删除行的完整数据(在 WHERE 子句中)。通过 Python 脚本将 DELETE 反转为 INSERT:

脚本 flashback_parser.py

#!/usr/bin/env python3
"""
解析 mysqlbinlog --base64-output=DECODE-ROWS -v 的输出,
将 DELETE 行反转为 INSERT 语句,用于数据恢复。
"""
import sys
import re

# 根据实际表结构定义列名(顺序必须与 binlog @1, @2, ... 对应)
ADS_RPT_COLUMNS = [
    "id", "org_id", "org_name", "org_type_code", "org_type_name",
    "org_area_code", "org_area_province", "org_area_city", "org_area_county",
    "monitor_group_id", "monitor_group_name", "monitor_year",
    "rpt_object_code", "rpt_object_name", "rpt_object_alias",
    "rpt_area_province", "rpt_area_city", "rpt_area_county",
    "rpt_area_province_code", "rpt_area_city_code", "rpt_area_county_code",
    "metric_code", "metric_name",
    "item_pcode", "item_pname", "item_psort",
    "item_code", "item_name", "item_sort", "item_value", "value_unit",
    "data_source", "team_name", "team_level", "team_sort",
    "grade_code", "grade_name", "subject_code", "subject_name",
    "metric_version_code",
    "task_id", "job_name", "version_code", "create_time", "update_time"
]

RPT_TEMPLATE_COLUMNS = [
    "id", "rpt_object_code", "rpt_object_name", "rpt_object_alias",
    "rpt_area_province_code", "rpt_area_province",
    "rpt_area_city_code", "rpt_area_city",
    "rpt_area_county_code", "rpt_area_county",
    "rpt_param_code", "rpt_param_name", "rpt_param_value",
    "task_id", "job_name", "version_code", "create_time", "update_time"
]

def escape_sql(val):
    if val == 'NULL':
        return 'NULL'
    if val.startswith("'") and val.endswith("'"):
        inner = val[1:-1]
        inner = inner.replace("\\", "\\\\").replace("'", "\\'")
        return "'" + inner + "'"
    if val.startswith("b'"):
        return val
    return val

def parse_binlog_file(input_file, output_ads, output_rpt):
    current_table = None
    current_row = {}
    in_where = False
    ads_count = 0
    rpt_count = 0

    with open(input_file, 'r', encoding='utf-8', errors='replace') as f:
        for line in f:
            line = line.rstrip('\n')

            m = re.match(r'^### DELETE FROM `([^`]+)`\.`([^`]+)`', line)
            if m:
                if current_row and current_table:
                    write_insert(current_table, current_row, output_ads, output_rpt)
                    if current_table == 'ads_rpt_team_series':
                        ads_count += 1
                    else:
                        rpt_count += 1
                table_name = m.group(2)
                if table_name in ('ads_rpt_team_series', 'rpt_template_params'):
                    current_table = table_name
                    current_row = {}
                    in_where = False
                else:
                    current_table = None
                    current_row = {}
                continue

            if line.strip() == '### WHERE':
                in_where = True
                continue
            if line.strip() == '### SET':
                in_where = False
                continue

            if in_where and current_table:
                m2 = re.match(r'^###\s+@(\d+)=(.*)', line)
                if m2:
                    col_idx = int(m2.group(1))
                    val = m2.group(2).strip()
                    current_row[col_idx] = val

    if current_row and current_table:
        write_insert(current_table, current_row, output_ads, output_rpt)
        if current_table == 'ads_rpt_team_series':
            ads_count += 1
        else:
            rpt_count += 1

    return ads_count, rpt_count

def write_insert(table_name, row_data, output_ads, output_rpt):
    if table_name == 'ads_rpt_team_series':
        columns = ADS_RPT_COLUMNS
        out = output_ads
    elif table_name == 'rpt_template_params':
        columns = RPT_TEMPLATE_COLUMNS
        out = output_rpt
    else:
        return

    max_idx = max(row_data.keys())
    values = []
    cols_used = []
    for i in range(1, min(max_idx, len(columns)) + 1):
        cols_used.append(columns[i - 1])
        val = row_data.get(i, 'NULL')
        values.append(escape_sql(val))

    sql = "INSERT INTO `目标数据库`.`{}` ({}) VALUES ({});\n".format(
        table_name,
        ', '.join('`' + c + '`' for c in cols_used),
        ', '.join(values)
    )
    out.write(sql)

def main():
    input_file = '/tmp/binlog_0418.txt'
    ads_output = '/tmp/flashback_ads_rpt_team_series.sql'
    rpt_output = '/tmp/flashback_rpt_template_params.sql'

    print(f"Parsing {input_file} ...")
    with open(ads_output, 'w', encoding='utf-8') as f_ads, \
         open(rpt_output, 'w', encoding='utf-8') as f_rpt:
        f_ads.write("SET NAMES utf8mb4;\nSET FOREIGN_KEY_CHECKS=0;\nSET autocommit=0;\n\n")
        f_rpt.write("SET NAMES utf8mb4;\nSET FOREIGN_KEY_CHECKS=0;\nSET autocommit=0;\n\n")
        ads_count, rpt_count = parse_binlog_file(input_file, f_ads, f_rpt)
        f_ads.write("\nCOMMIT;\nSET FOREIGN_KEY_CHECKS=1;\n")
        f_rpt.write("\nCOMMIT;\nSET FOREIGN_KEY_CHECKS=1;\n")

    print(f"Done!")
    print(f"  ads_rpt_team_series: {ads_count} INSERT statements -> {ads_output}")
    print(f"  rpt_template_params: {rpt_count} INSERT statements -> {rpt_output}")

if __name__ == '__main__':
    main()

运行:

python3 flashback_parser.py

第四步:导入恢复数据

性能关键: SQL 文件头部必须加 SET autocommit=0;,尾部加 COMMIT;,否则逐条提交极慢(6 万行/小时 → 25 万行/几分钟)。

# 先创建恢复用的数据库和表(结构与原表一致)
mysql -h 127.0.0.1 -u root -p 目标数据库 < /tmp/flashback_ads_rpt_team_series.sql
mysql -h 127.0.0.1 -u root -p 目标数据库 < /tmp/flashback_rpt_template_params.sql

第五步:验证数据

-- 对比行数
SELECT COUNT(*) FROM 恢复库.ads_rpt_team_series;    -- 预期 252830
SELECT COUNT(*) FROM 恢复库.rpt_template_params;     -- 预期 3480

第六步:回写原库

-- 使用 INSERT IGNORE 避免主键冲突
INSERT IGNORE INTO data_center_test.ads_rpt_team_series
SELECT * FROM data_center_test_recover.ads_rpt_team_series;

INSERT IGNORE INTO data_center_test.rpt_template_params
SELECT * FROM data_center_test_recover.rpt_template_params;

踩坑记录

问题原因解决方案
mysql -u root -p socket 连接认证失败Docker MySQL 的 root 账户可能只授权了 TCP 连接使用 -h 127.0.0.1 强制 TCP 连接
容器内无 mysqlbinlog精简版 Docker 镜像在宿主机安装 MySQL 5.7 客户端
MariaDB mysqlbinlog 报错 event_type: 35MariaDB 与 MySQL 5.7 binlog 格式不兼容卸载 MariaDB 客户端,安装 MySQL 官方客户端
MySQL yum 安装 GPG 校验失败官方 GPG key 过期--nogpgcheck 参数
binlog2sql 闪回工具输出 0 字节兼容性问题自写 Python 解析脚本
导入极慢(6 万行/小时)默认 autocommit=1,逐条提交SQL 文件头加 SET autocommit=0;,尾加 COMMIT;
1 行报错 Unknown column 'col_19'该行是 binlog 事务边界处最后一条 DELETE,解析器误将后续 binlog 元数据当作额外字段从 binlog 原文提取真实数据,手动补录

适用场景

  • MySQL 5.7 ROW 格式 binlog
  • 误删数据(DELETE)的闪回恢复
  • 不依赖第三方工具(binlog2sql 等兼容性差时的备选方案)

局限性

  • 仅支持 DELETE 闪回(转 INSERT),不支持 UPDATE 闪回(需额外处理 SET/WHERE)
  • 需要知道表的列名和顺序(binlog 只记录 @1, @2, ... 位置号)
  • binlog 文件必须未被 purge

标签: none

上一篇: OpenClaw 模型厂商配置指南
下一篇: 没有了

添加新评论