MySQL Binlog 数据恢复方案
AI摘要
本文介绍了一种基于MySQL Binlog的闪回恢复方案,用于恢复因误操作而删除的数据。方案包括定位binlog文件、解析binlog、将DELETE事件转换为INSERT、导入恢复数据、验证数据和回写原库等步骤。同时,提供了Python脚本用于将binlog中的DELETE事件转换为INSERT语句,并讨论了方案的适用场景和局限性。
背景
2026-04-18 18:05:15,FSSE 数据中心系统通过 FsseDataVersionApiService.deleteMany() 接口删除了 7 个数据版本,触发级联删除,清空了终端表 ads_rpt_team_series(252,830 行)和 rpt_template_params(3,480 行)的对应数据。
根因分析
调用链路:
deleteMany API(传入 7 个版本 ID)
→ FsseDataVersionServiceImpl.deleteMany()
→ 遍历 FinalTableEnum 所有终端表
→ FinalTableServiceImpl.deleteFinalTableDataByVersionCode()
→ DELETE FROM tableName WHERE version_code IN (:versionCodes)
→ 更新任务记录状态为 NOT_SAVED
→ 删除版本记录本身被删除的 7 个版本:坡头巴里坤洋浦赤坎_v1、柯坪县_v1、民丰县_v1、沙湾市_v2、泽普县_v2、泽普县_v3、泽普县_测试。
环境信息
| 项目 | 值 |
|---|---|
| 服务器 | 192.168.3.166 |
| MySQL 版本 | 5.7(Docker 容器 mysql_5.7) |
| 数据卷 | /data/mysql_5.7/data → /var/lib/mysql |
| 认证方式 | mysql -h 127.0.0.1 -u root -pyide123456(必须用 TCP,socket 连接会认证失败) |
| binlog 格式 | ROW |
| 原始数据库 | data_center_test |
| 恢复数据库 | data_center_test_recover |
恢复流程
第一步:定位 binlog 文件
-- 登录 MySQL 查看 binlog 列表
SHOW BINARY LOGS;
-- 查看目标时间点附近的事件(先找大致位置)
SHOW BINLOG EVENTS IN 'mysql-bin.000003' LIMIT 100;第二步:解析 binlog
注意:该 Docker 镜像是精简版,容器内没有mysqlbinlog工具。需要在宿主机安装 MySQL 5.7 客户端。MariaDB 自带的 mysqlbinlog 与 MySQL 5.7 binlog 不兼容(报错event_type: 35)。
安装 MySQL 5.7 客户端(CentOS):
# 添加 MySQL 官方 repo
rpm -Uvh https://dev.mysql.com/get/mysql57-community-release-el7-11.noarch.rpm
# 安装(跳过 GPG 校验,官方 key 已过期)
yum install --nogpgcheck mysql-community-client -y解析 binlog 为可读文本:
mysqlbinlog \
--base64-output=DECODE-ROWS -v \
--start-datetime="2026-04-18 18:04:00" \
--stop-datetime="2026-04-18 18:07:00" \
/data/mysql_5.7/data/mysql-bin.000003 \
> /tmp/binlog_0418.txt第三步:将 DELETE 转换为 INSERT(闪回)
ROW 格式的 binlog 中,DELETE 事件包含被删除行的完整数据(在 WHERE 子句中)。通过 Python 脚本将 DELETE 反转为 INSERT:
脚本 flashback_parser.py:
#!/usr/bin/env python3
"""
解析 mysqlbinlog --base64-output=DECODE-ROWS -v 的输出,
将 DELETE 行反转为 INSERT 语句,用于数据恢复。
"""
import sys
import re
# 根据实际表结构定义列名(顺序必须与 binlog @1, @2, ... 对应)
ADS_RPT_COLUMNS = [
"id", "org_id", "org_name", "org_type_code", "org_type_name",
"org_area_code", "org_area_province", "org_area_city", "org_area_county",
"monitor_group_id", "monitor_group_name", "monitor_year",
"rpt_object_code", "rpt_object_name", "rpt_object_alias",
"rpt_area_province", "rpt_area_city", "rpt_area_county",
"rpt_area_province_code", "rpt_area_city_code", "rpt_area_county_code",
"metric_code", "metric_name",
"item_pcode", "item_pname", "item_psort",
"item_code", "item_name", "item_sort", "item_value", "value_unit",
"data_source", "team_name", "team_level", "team_sort",
"grade_code", "grade_name", "subject_code", "subject_name",
"metric_version_code",
"task_id", "job_name", "version_code", "create_time", "update_time"
]
RPT_TEMPLATE_COLUMNS = [
"id", "rpt_object_code", "rpt_object_name", "rpt_object_alias",
"rpt_area_province_code", "rpt_area_province",
"rpt_area_city_code", "rpt_area_city",
"rpt_area_county_code", "rpt_area_county",
"rpt_param_code", "rpt_param_name", "rpt_param_value",
"task_id", "job_name", "version_code", "create_time", "update_time"
]
def escape_sql(val):
if val == 'NULL':
return 'NULL'
if val.startswith("'") and val.endswith("'"):
inner = val[1:-1]
inner = inner.replace("\\", "\\\\").replace("'", "\\'")
return "'" + inner + "'"
if val.startswith("b'"):
return val
return val
def parse_binlog_file(input_file, output_ads, output_rpt):
current_table = None
current_row = {}
in_where = False
ads_count = 0
rpt_count = 0
with open(input_file, 'r', encoding='utf-8', errors='replace') as f:
for line in f:
line = line.rstrip('\n')
m = re.match(r'^### DELETE FROM `([^`]+)`\.`([^`]+)`', line)
if m:
if current_row and current_table:
write_insert(current_table, current_row, output_ads, output_rpt)
if current_table == 'ads_rpt_team_series':
ads_count += 1
else:
rpt_count += 1
table_name = m.group(2)
if table_name in ('ads_rpt_team_series', 'rpt_template_params'):
current_table = table_name
current_row = {}
in_where = False
else:
current_table = None
current_row = {}
continue
if line.strip() == '### WHERE':
in_where = True
continue
if line.strip() == '### SET':
in_where = False
continue
if in_where and current_table:
m2 = re.match(r'^###\s+@(\d+)=(.*)', line)
if m2:
col_idx = int(m2.group(1))
val = m2.group(2).strip()
current_row[col_idx] = val
if current_row and current_table:
write_insert(current_table, current_row, output_ads, output_rpt)
if current_table == 'ads_rpt_team_series':
ads_count += 1
else:
rpt_count += 1
return ads_count, rpt_count
def write_insert(table_name, row_data, output_ads, output_rpt):
if table_name == 'ads_rpt_team_series':
columns = ADS_RPT_COLUMNS
out = output_ads
elif table_name == 'rpt_template_params':
columns = RPT_TEMPLATE_COLUMNS
out = output_rpt
else:
return
max_idx = max(row_data.keys())
values = []
cols_used = []
for i in range(1, min(max_idx, len(columns)) + 1):
cols_used.append(columns[i - 1])
val = row_data.get(i, 'NULL')
values.append(escape_sql(val))
sql = "INSERT INTO `目标数据库`.`{}` ({}) VALUES ({});\n".format(
table_name,
', '.join('`' + c + '`' for c in cols_used),
', '.join(values)
)
out.write(sql)
def main():
input_file = '/tmp/binlog_0418.txt'
ads_output = '/tmp/flashback_ads_rpt_team_series.sql'
rpt_output = '/tmp/flashback_rpt_template_params.sql'
print(f"Parsing {input_file} ...")
with open(ads_output, 'w', encoding='utf-8') as f_ads, \
open(rpt_output, 'w', encoding='utf-8') as f_rpt:
f_ads.write("SET NAMES utf8mb4;\nSET FOREIGN_KEY_CHECKS=0;\nSET autocommit=0;\n\n")
f_rpt.write("SET NAMES utf8mb4;\nSET FOREIGN_KEY_CHECKS=0;\nSET autocommit=0;\n\n")
ads_count, rpt_count = parse_binlog_file(input_file, f_ads, f_rpt)
f_ads.write("\nCOMMIT;\nSET FOREIGN_KEY_CHECKS=1;\n")
f_rpt.write("\nCOMMIT;\nSET FOREIGN_KEY_CHECKS=1;\n")
print(f"Done!")
print(f" ads_rpt_team_series: {ads_count} INSERT statements -> {ads_output}")
print(f" rpt_template_params: {rpt_count} INSERT statements -> {rpt_output}")
if __name__ == '__main__':
main()运行:
python3 flashback_parser.py第四步:导入恢复数据
性能关键: SQL 文件头部必须加 SET autocommit=0;,尾部加 COMMIT;,否则逐条提交极慢(6 万行/小时 → 25 万行/几分钟)。
# 先创建恢复用的数据库和表(结构与原表一致)
mysql -h 127.0.0.1 -u root -p 目标数据库 < /tmp/flashback_ads_rpt_team_series.sql
mysql -h 127.0.0.1 -u root -p 目标数据库 < /tmp/flashback_rpt_template_params.sql第五步:验证数据
-- 对比行数
SELECT COUNT(*) FROM 恢复库.ads_rpt_team_series; -- 预期 252830
SELECT COUNT(*) FROM 恢复库.rpt_template_params; -- 预期 3480第六步:回写原库
-- 使用 INSERT IGNORE 避免主键冲突
INSERT IGNORE INTO data_center_test.ads_rpt_team_series
SELECT * FROM data_center_test_recover.ads_rpt_team_series;
INSERT IGNORE INTO data_center_test.rpt_template_params
SELECT * FROM data_center_test_recover.rpt_template_params;踩坑记录
| 问题 | 原因 | 解决方案 |
|---|---|---|
mysql -u root -p socket 连接认证失败 | Docker MySQL 的 root 账户可能只授权了 TCP 连接 | 使用 -h 127.0.0.1 强制 TCP 连接 |
容器内无 mysqlbinlog | 精简版 Docker 镜像 | 在宿主机安装 MySQL 5.7 客户端 |
MariaDB mysqlbinlog 报错 event_type: 35 | MariaDB 与 MySQL 5.7 binlog 格式不兼容 | 卸载 MariaDB 客户端,安装 MySQL 官方客户端 |
| MySQL yum 安装 GPG 校验失败 | 官方 GPG key 过期 | 加 --nogpgcheck 参数 |
binlog2sql 闪回工具输出 0 字节 | 兼容性问题 | 自写 Python 解析脚本 |
| 导入极慢(6 万行/小时) | 默认 autocommit=1,逐条提交 | SQL 文件头加 SET autocommit=0;,尾加 COMMIT; |
1 行报错 Unknown column 'col_19' | 该行是 binlog 事务边界处最后一条 DELETE,解析器误将后续 binlog 元数据当作额外字段 | 从 binlog 原文提取真实数据,手动补录 |
适用场景
- MySQL 5.7 ROW 格式 binlog
- 误删数据(DELETE)的闪回恢复
- 不依赖第三方工具(binlog2sql 等兼容性差时的备选方案)
局限性
- 仅支持 DELETE 闪回(转 INSERT),不支持 UPDATE 闪回(需额外处理 SET/WHERE)
- 需要知道表的列名和顺序(binlog 只记录 @1, @2, ... 位置号)
- binlog 文件必须未被 purge