本文最后更新于413 天前,其中的信息可能已经过时,如有错误请发送邮件到2446865563@qq.com
1.简介
【脚本简介】这是一款将一个Mysql数据库的数据完全同步到另一个Mysql数据库中Python脚本。该脚本的执行需要保证目标数据库的存在,同时保证目标数据库没有表格或者表格的结构和源数据库的表格结构相同,否则将报错。此外,该脚本可以调整每个批次传输的记录的条数,具体根据数据库的总数据量确定。
【脚本主要使用场景】该脚本可用在Mysql异地备份的场景中,将服务器的数据库备份到其他地方的数据库中。
2.代码
"""
@event: 同步两个mysql数据库。可以是不用户域名不同用户之间的,只需要目标数据库存在即可,表格也会自动创建
@envs:
- pip install pymysql==1.1.0
@params:
(Db字典结构: host 域名;port mysql服务的端口;dbName 数据库名字;user 用户名;passwd 用户密码;)
- srcDb:dict 原mysql数据库信息
- dstDb:dict 目标mysql数据库信息
- tableNameFilter:typing.Callable 过滤tableName,该函数返回True则同步处理,反之不处理,默认lambda tableName:True
@return:
- None
@warning:
@eg:
if __name__=="__main__":
SrcDb={
"host":"172.10.10.10",
"port":13306,
"dbName":"src_db",
"user":"root",
"passwd":"1111"
}
DstDb={
"host":"172.10.10.10",
"port":13306,
"dbName":"dst_db",
"user":"root",
"passwd":"1111"
}
BatchCnt=200
sync2MysqlDbs(SrcDb,DstDb,BatchCnt,tableNameFilter=lambda name:name!="table1")
"""
import pymysql
import re, typing
def sync2MysqlDbs(
srcDb, dstDb, batchCnt, tableNameFilter: typing.Callable = lambda tableName: True
):
# src/dst
def getConnection(type_="src"):
dbInfo = srcDb.copy() if type_ == "src" else dstDb.copy()
host = dbInfo["host"]
port = dbInfo["port"]
dbName = dbInfo["dbName"]
user = dbInfo["user"]
passwd = dbInfo["passwd"]
return pymysql.connect(
host=host,
user=user,
password=passwd,
database=dbName,
port=port,
cursorclass=pymysql.cursors.DictCursor,
)
# 获取数据库连接
srcConn = getConnection("src")
dstConn = getConnection("dst")
# 获取srcDb的表格名字列表
cursor = srcConn.cursor()
cursor.execute("SHOW TABLES")
tableNames = [row["Tables_in_" + srcDb["dbName"]] for row in cursor.fetchall()]
tableNames = [name for name in tableNames if tableNameFilter(name)]
# print(tableNames)
# 获取dstDb的表格名字列表
dstCursor = dstConn.cursor()
dstCursor.execute("SHOW TABLES")
dstTableNames = [
row["Tables_in_" + dstDb["dbName"]] for row in dstCursor.fetchall()
]
for tableName in tableNames:
print(f"-> 开始同步 {tableName}")
# 获取建表信息
cursor.execute(f"SHOW CREATE TABLE {tableName} ")
create_statement = cursor.fetchone()["Create Table"]
# 如果dstDb中不存在则创建
if tableName not in dstTableNames:
dstCursor.execute(create_statement)
# 获取主键列表
primaryKeysStr = re.findall(r"PRIMARY KEY \(`(.+?)`\)", create_statement)[0]
primaryKeys = primaryKeysStr.split("`,`")
# print("primaryKeys",primaryKeys)
# 获取dst表中primaryKeys已经存在的主键元组集合
dstCursor.execute(f"SELECT * FROM `{tableName}`")
# 获取列名列表
cursor.execute(f"SHOW COLUMNS FROM {tableName};")
fields = [item["Field"] for item in cursor.fetchall()]
# print("fields",fields)
# dst中已经存在的记录
exists = set()
for item in dstCursor.fetchall():
tuple_ = tuple([item[key] for key in primaryKeys])
exists.add(tuple_)
# print(exists)
# 获取srcDb中当前表格的数据列表
cursor.execute(f"SELECT * FROM {tableName}")
index, batchIndex = 0, 0
data = []
batchInsertSql = f'INSERT INTO {tableName} ({", ".join(["`"+field+"`" for field in fields])}) VALUES ({", ".join(["%s" for _ in range(len(fields))])})'
for item in cursor.fetchall():
# print(item)
tuple_ = tuple([item[key] for key in primaryKeys]) # 主键的元组
if tuple_ not in exists:
data.append(tuple([item[field] for field in fields]))
index += 1
if index > 0 and index % batchCnt == 0:
# print(batchInsertSql)
dstCursor.executemany(batchInsertSql, data)
dstConn.commit()
data.clear()
print(f"{tableName}-第{batchIndex+1}批次存储完成")
batchIndex += 1
if data:
# 不为空,则继续写入
dstCursor.executemany(batchInsertSql, data)
dstConn.commit()
data.clear()
print(f"=> {tableName} 批量存储完成\n")
print("=> 全部存储完成")
cursor.close()
dstCursor.close()
srcConn.close()
dstConn.close()
print("=> 连接关闭完成")
if __name__ == "__main__":
SrcDb = {
"host": "172.10.10.10",
"port": 13306,
"dbName": "src_db",
"user": "root",
"passwd": "1111",
}
DstDb = {
"host": "172.10.10.10",
"port": 13306,
"dbName": "dst_dst",
"user": "root",
"passwd": "1111",
}
BatchCnt = 200
sync2MysqlDbs(SrcDb, DstDb, BatchCnt, tableNameFilter=lambda name: name != "table1")










