侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 79 篇文章
  • 累计创建 53 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Python导出数据库到csv

江南的风
2025-01-08 / 0 评论 / 0 点赞 / 35 阅读 / 7709 字 / 正在检测是否收录...

采用多进程并发导出方式,可查看日志和进度,一表一日志;

--table 参数指定要导出的表,如果不传这个参数导出整个数据库。

最后指定打包文件可以打包到一个zip文件

#!/usr/bin/python
# coding=utf-8
# author:zhaohaidong
# datetime:2024/12/16 16:23
# software: PyCharm
# Copyright(C) 2020 zhaohaidong
"""
文件说明:

"""
import logging
import os
import shutil
import sys
import time
import zipfile
import pandas as pd
import mysql.connector
from clickhouse_connect import get_client
from multiprocessing import Process, Queue


def setup_logging(log_file):
    # 配置日志记录
    logging.basicConfig(
        filename=log_file,  # 指定日志文件名
        level=logging.INFO,  # 设置日志级别
        format='%(asctime)s - %(levelname)s - %(message)s'  # 设置日志格式
    )

def export_mysql_table(host, port, user, password, database, table_name, output_dir, log_file):
    """导出单个 MySQL 表到 CSV"""
    setup_logging(log_file)

    connection = mysql.connector.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        database=database
    )
    cursor = connection.cursor()
    logging.info(f"正在导出表 {table_name}...")

    # 获取总行数以计算进度
    cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    total_rows = cursor.fetchone()[0]
    exported_rows = 0

    query = f"SELECT * FROM {table_name}"
    csv_file = os.path.join(output_dir, f"{table_name}.csv")

    with open(csv_file, 'w', newline='', encoding='utf-8') as f:
        cursor.execute(query)
        columns = [desc[0] for desc in cursor.description]
        writer = pd.DataFrame([], columns=columns).to_csv(f, index=False)

        while True:
            rows = cursor.fetchmany(3000)
            if not rows:
                break
            df = pd.DataFrame(rows, columns=columns)
            df.to_csv(f, index=False, header=False, mode='a', encoding='utf-8')
            exported_rows += len(rows)
            progress = (exported_rows / total_rows) * 100
            logging.info(f"表 {table_name} 导出进度: {progress:.2f}%")

    logging.info(f"表 {table_name} 导出完成,保存到 {csv_file}\n")
    cursor.close()
    connection.close()

def export_clickhouse_table(host, port, user, password, database, table_name, output_dir, log_file):
    """导出单个 ClickHouse 表到 CSV"""
    setup_logging(log_file)
    logging.info(f"正在导出表 {table_name}...")

    client = get_client(host=host, port=port, user=user, password=password, database=database)

    # 获取总行数以计算进度
    total_rows = client.command(f"SELECT COUNT(*) FROM {table_name}")
    exported_rows = 0

    query = f"SELECT * FROM {table_name}"
    csv_file = os.path.join(output_dir, f"{table_name}.csv")

    with open(csv_file, 'w', newline='', encoding='utf-8') as f:
        result = client.query(query)
        columns = result.column_names
        writer = pd.DataFrame([], columns=columns).to_csv(f, index=False)

        for rows in result.stream_rows():
            df = pd.DataFrame(rows, columns=columns)
            df.to_csv(f, index=False, header=False, mode='a', encoding='utf-8')
            exported_rows += len(rows)
            progress = (exported_rows / total_rows) * 100
            logging.info(f"表 {table_name} 导出进度: {progress:.2f}%")

    logging.info(f"表 {table_name} 导出完成,保存到 {csv_file}\n")

def export_mysql_to_csv(host, port, user, password, database, table=None, output_dir='.'):
    """多进程导出 MySQL 表到 CSV"""
    connection = mysql.connector.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        database=database
    )
    cursor = connection.cursor()

    if table:
        tables = table.split(',')
    else:
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]

    cursor.close()
    connection.close()

    processes = []
    for table_name in tables:
        log_file = f"{table_name}.log"
        process = Process(
            target=export_mysql_table,
            args=(host, port, user, password, database, table_name, output_dir, log_file)
        )
        processes.append(process)
        process.start()

    for process in processes:
        process.join()


def export_clickhouse_to_csv(host, port, user, password, database, table=None, output_dir='.'):
    """多进程导出 ClickHouse 表到 CSV"""
    client = get_client(host=host, port=port, user=user, password=password, database=database)

    if table:
        tables = [table]
    else:
        tables = client.command("SHOW TABLES")

    processes = []
    for table_name in tables:
        log_file = f"{table_name}.log"
        process = Process(
            target=export_clickhouse_table,
            args=(host, port, user, password, database, table_name, output_dir, log_file)
        )
        processes.append(process)
        process.start()

    for process in processes:
        process.join()


def zip_csv_files(output_dir, zip_file):
    with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for root, dirs, files in os.walk(output_dir):
            for file in files:
                # 打印文件名
                if file.endswith('.csv'):
                    zipf.write(os.path.join(root, file), file)
    logging.info(f"Created zip file {zip_file}")

    # move zip file to output_dir
    shutil.move(zip_file, os.path.join(output_dir, zip_file))

    # Delete original CSV files
    for root, dirs, files in os.walk(output_dir):
        for file in files:
            if file.endswith('.csv'):
                os.remove(os.path.join(root, file))
    logging.info(f"Deleted original CSV files in {output_dir}")


def main(db_type, host, port, user, password, database, table=None, output_dir='.', zip_file='database.zip'):
    """主函数"""
    if db_type == 'mysql':
        export_mysql_to_csv(host, port, user, password, database, table, output_dir)
    elif db_type == 'clickhouse':
        export_clickhouse_to_csv(host, port, user, password, database, table, output_dir)
    else:
        logging.error(f"Unsupported database type: {db_type}")
        sys.exit(1)

    time.sleep(3)

    zip_csv_files(output_dir, zip_file)

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Export database tables to CSV and zip them.")
    parser.add_argument('db_type', choices=['mysql', 'clickhouse'], help="Database type (mysql or clickhouse)")
    parser.add_argument('host', help="Database host")
    parser.add_argument('port', type=int, help="Database port")
    parser.add_argument('user', help="Database user")
    parser.add_argument('password', help="Database password")
    parser.add_argument('database', help="Database name")
    parser.add_argument('--table', help="Specific table to export (optional)")
    parser.add_argument('--output_dir', default='.', help="Output directory for CSV files (default: current directory)")
    parser.add_argument('--zip_file', default='database.zip', help="Output zip file name (default: database.zip)")


    args = parser.parse_args()
    main(args.db_type, args.host, args.port, args.user, args.password, args.database, args.table, args.output_dir, args.zip_file)

0

评论区