采用多进程并发导出方式,可查看日志和进度,一表一日志;
--table 参数指定要导出的表,如果不传这个参数导出整个数据库。
最后指定打包文件可以打包到一个zip文件
#!/usr/bin/python
# coding=utf-8
# author:zhaohaidong
# datetime:2024/12/16 16:23
# software: PyCharm
# Copyright(C) 2020 zhaohaidong
"""
文件说明:
"""
import logging
import os
import shutil
import sys
import time
import zipfile
import pandas as pd
import mysql.connector
from clickhouse_connect import get_client
from multiprocessing import Process, Queue
def setup_logging(log_file):
# 配置日志记录
logging.basicConfig(
filename=log_file, # 指定日志文件名
level=logging.INFO, # 设置日志级别
format='%(asctime)s - %(levelname)s - %(message)s' # 设置日志格式
)
def export_mysql_table(host, port, user, password, database, table_name, output_dir, log_file):
"""导出单个 MySQL 表到 CSV"""
setup_logging(log_file)
connection = mysql.connector.connect(
host=host,
port=port,
user=user,
password=password,
database=database
)
cursor = connection.cursor()
logging.info(f"正在导出表 {table_name}...")
# 获取总行数以计算进度
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
total_rows = cursor.fetchone()[0]
exported_rows = 0
query = f"SELECT * FROM {table_name}"
csv_file = os.path.join(output_dir, f"{table_name}.csv")
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
writer = pd.DataFrame([], columns=columns).to_csv(f, index=False)
while True:
rows = cursor.fetchmany(3000)
if not rows:
break
df = pd.DataFrame(rows, columns=columns)
df.to_csv(f, index=False, header=False, mode='a', encoding='utf-8')
exported_rows += len(rows)
progress = (exported_rows / total_rows) * 100
logging.info(f"表 {table_name} 导出进度: {progress:.2f}%")
logging.info(f"表 {table_name} 导出完成,保存到 {csv_file}\n")
cursor.close()
connection.close()
def export_clickhouse_table(host, port, user, password, database, table_name, output_dir, log_file):
"""导出单个 ClickHouse 表到 CSV"""
setup_logging(log_file)
logging.info(f"正在导出表 {table_name}...")
client = get_client(host=host, port=port, user=user, password=password, database=database)
# 获取总行数以计算进度
total_rows = client.command(f"SELECT COUNT(*) FROM {table_name}")
exported_rows = 0
query = f"SELECT * FROM {table_name}"
csv_file = os.path.join(output_dir, f"{table_name}.csv")
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
result = client.query(query)
columns = result.column_names
writer = pd.DataFrame([], columns=columns).to_csv(f, index=False)
for rows in result.stream_rows():
df = pd.DataFrame(rows, columns=columns)
df.to_csv(f, index=False, header=False, mode='a', encoding='utf-8')
exported_rows += len(rows)
progress = (exported_rows / total_rows) * 100
logging.info(f"表 {table_name} 导出进度: {progress:.2f}%")
logging.info(f"表 {table_name} 导出完成,保存到 {csv_file}\n")
def export_mysql_to_csv(host, port, user, password, database, table=None, output_dir='.'):
"""多进程导出 MySQL 表到 CSV"""
connection = mysql.connector.connect(
host=host,
port=port,
user=user,
password=password,
database=database
)
cursor = connection.cursor()
if table:
tables = table.split(',')
else:
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
cursor.close()
connection.close()
processes = []
for table_name in tables:
log_file = f"{table_name}.log"
process = Process(
target=export_mysql_table,
args=(host, port, user, password, database, table_name, output_dir, log_file)
)
processes.append(process)
process.start()
for process in processes:
process.join()
def export_clickhouse_to_csv(host, port, user, password, database, table=None, output_dir='.'):
"""多进程导出 ClickHouse 表到 CSV"""
client = get_client(host=host, port=port, user=user, password=password, database=database)
if table:
tables = [table]
else:
tables = client.command("SHOW TABLES")
processes = []
for table_name in tables:
log_file = f"{table_name}.log"
process = Process(
target=export_clickhouse_table,
args=(host, port, user, password, database, table_name, output_dir, log_file)
)
processes.append(process)
process.start()
for process in processes:
process.join()
def zip_csv_files(output_dir, zip_file):
with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(output_dir):
for file in files:
# 打印文件名
if file.endswith('.csv'):
zipf.write(os.path.join(root, file), file)
logging.info(f"Created zip file {zip_file}")
# move zip file to output_dir
shutil.move(zip_file, os.path.join(output_dir, zip_file))
# Delete original CSV files
for root, dirs, files in os.walk(output_dir):
for file in files:
if file.endswith('.csv'):
os.remove(os.path.join(root, file))
logging.info(f"Deleted original CSV files in {output_dir}")
def main(db_type, host, port, user, password, database, table=None, output_dir='.', zip_file='database.zip'):
"""主函数"""
if db_type == 'mysql':
export_mysql_to_csv(host, port, user, password, database, table, output_dir)
elif db_type == 'clickhouse':
export_clickhouse_to_csv(host, port, user, password, database, table, output_dir)
else:
logging.error(f"Unsupported database type: {db_type}")
sys.exit(1)
time.sleep(3)
zip_csv_files(output_dir, zip_file)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Export database tables to CSV and zip them.")
parser.add_argument('db_type', choices=['mysql', 'clickhouse'], help="Database type (mysql or clickhouse)")
parser.add_argument('host', help="Database host")
parser.add_argument('port', type=int, help="Database port")
parser.add_argument('user', help="Database user")
parser.add_argument('password', help="Database password")
parser.add_argument('database', help="Database name")
parser.add_argument('--table', help="Specific table to export (optional)")
parser.add_argument('--output_dir', default='.', help="Output directory for CSV files (default: current directory)")
parser.add_argument('--zip_file', default='database.zip', help="Output zip file name (default: database.zip)")
args = parser.parse_args()
main(args.db_type, args.host, args.port, args.user, args.password, args.database, args.table, args.output_dir, args.zip_file)
评论区