侧边栏壁纸
博主头像
ZHD的小窝博主等级

行动起来,活在当下

  • 累计撰写 84 篇文章
  • 累计创建 54 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Apache Airflow的使用

江南的风
2024-07-12 / 0 评论 / 0 点赞 / 0 阅读 / 11733 字 / 正在检测是否收录...

Airflow 快速入门​

  1. 安装:pip install apache-airflow

  2. 初始化数据库:airflow db init

  3. 将上面的代码示例保存为 data_pipeline.py 放入 ~/airflow/dags 目录

  4. 启动:airflow webserver & airflow scheduler

  5. 在Web界面(localhost:8080)触发流水线执行!

  6. 创建用户:airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin

编码即流程

以下是结合S3文件触发机制和菱形依赖关系的完整Airflow DAG示例:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from datetime import datetime, timedelta

# 自定义辅助函数
def get_task2_path(context):
    """为任务2生成唯一的S3路径"""
    return f"output/task2/{context['dag_run'].run_id}.result"

def get_task3_path(context):
    """为任务3生成唯一的S3路径"""
    return f"output/task3/{context['dag_run'].run_id}.result"

def handle_task4():
    """任务4的处理函数"""
    print("任务2和任务3都已完成,开始执行最终处理")
    # 在这里添加合并结果的逻辑
    return "所有处理完成"

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    's3_diamond_workflow',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    max_active_runs=3
) as dag:
    
    # ===== 任务1:启动处理 =====
    start_processing = SimpleHttpOperator(
        task_id='task1_start_processing',
        method='POST',
        http_conn_id='api_endpoint',
        endpoint='/process',
        data='{'
             f'"task2_output": "{get_task2_path}",'
             f'"task3_output": "{get_task3_path}",'
             '"run_id": "{{ run_id }}"'
             '}',
        headers={"Content-Type": "application/json"},
        do_xcom_push=True
    )
    
    # ===== 任务2:处理分支1 =====
    # 2.1 等待S3文件生成 (分支1)
    wait_for_task2 = S3KeySensor(
        task_id='wait_for_task2_output',
        bucket_name='processing-bucket',
        bucket_key=get_task2_path,
        aws_conn_id='aws_default',
        poke_interval=60,
        timeout=3600,
        mode='reschedule'
    )
    
    # 2.2 处理任务2的文件
    process_task2 = PythonOperator(
        task_id='task2_process_results',
        python_callable=lambda **ctx: (
            f"处理任务2结果: {ctx['dag_run'].run_id}"
        )
    )
    
    # ===== 任务3:处理分支2 =====
    # 3.1 等待S3文件生成 (分支2)
    wait_for_task3 = S3KeySensor(
        task_id='wait_for_task3_output',
        bucket_name='processing-bucket',
        bucket_key=get_task3_path,
        aws_conn_id='aws_default',
        poke_interval=60,
        timeout=3600,
        mode='reschedule'
    )
    
    # 3.2 处理任务3的文件
    process_task3 = PythonOperator(
        task_id='task3_process_results',
        python_callable=lambda **ctx: (
            f"处理任务3结果: {ctx['dag_run'].run_id}"
        )
    )
    
    # ===== 任务4:最终处理(依赖任务2和任务3) =====
    final_processing = PythonOperator(
        task_id='task4_final_processing',
        python_callable=handle_task4
    )
    
    # ===== 可选:清理任务 =====
    clean_task2 = S3CopyObjectOperator(
        task_id='clean_task2_output',
        source_bucket_name='processing-bucket',
        source_bucket_key=get_task2_path,
        dest_bucket_name='archive-bucket',
        dest_bucket_key=lambda ctx: f"archived/{ctx['ds']}/" + get_task2_path(ctx),
        delete_src=True
    )
    
    clean_task3 = S3CopyObjectOperator(
        task_id='clean_task3_output',
        source_bucket_name='processing-bucket',
        source_bucket_key=get_task3_path,
        dest_bucket_name='archive-bucket',
        dest_bucket_key=lambda ctx: f"archived/{ctx['ds']}/" + get_task3_path(ctx),
        delete_src=True
    )
    
    # ===== 设置依赖关系 =====
    # 任务1完成后并行执行两个分支的等待任务
    start_processing >> [wait_for_task2, wait_for_task3]
    
    # 每个分支内部的文件等待 -> 处理
    wait_for_task2 >> process_task2
    wait_for_task3 >> process_task3
    
    # 两个分支都完成后执行最终处理
    [process_task2, process_task3] >> final_processing
    
    # 最终处理完成后清理文件(可选)
    final_processing >> [clean_task2, clean_task3]

# 说明: 实际环境中需要添加具体的文件处理函数
# 例如:
# def process_file_content(bucket, key):
#     from airflow.providers.amazon.aws.hooks.s3 import S3Hook
#     s3 = S3Hook('aws_default')
#     file_path = s3.download_file(key=key, bucket_name=bucket)
#     # ... 处理文件内容 ...

工作流执行过程详解

1. 流程结构

       [task1_start_processing]
            /         \
[wait_for_task2]    [wait_for_task3]
      |                  |
[task2_process_results] [task3_process_results]
            \         /
       [task4_final_processing]
            /         \
    [clean_task2]  [clean_task3]

2. 执行顺序

  1. ​任务1​​ 调用HTTP API启动处理:

    • 传递两个S3输出路径(分别给任务2和任务3)

    • 包含唯一标识符(run_id)

  2. ​任务2路径​

    • 等待 /output/task2/{run_id}.result 文件出现

    • 处理文件内容

    • 完成后触发清理操作

  3. ​任务3路径​

    • 等待 /output/task3/{run_id}.result 文件出现

    • 处理文件内容

    • 完成后触发清理操作

  4. ​任务4​

    • 等待任务2和任务3都完成

    • 执行最终汇总/合并操作

  5. ​清理任务​

    • 将处理完成的文件移动到归档存储桶

    • 删除源文件

生产环境增强建议

1. 文件处理函数(实际实现)

def process_task2_file(**context):
    """实际处理任务2文件的函数"""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook
    s3 = S3Hook('aws_default')
    bucket = 'processing-bucket'
    key = get_task2_path(context)
    
    try:
        # 下载文件
        file_path = s3.download_file(key=key, bucket_name=bucket)
        
        # 读取并处理文件内容
        with open(file_path) as f:
            content = f.read()
            result = f"任务2处理结果: {len(content)}字节"
            
        # 可选的: 存储结果供任务4使用
        context['ti'].xcom_push(key='task2_result', value=result)
        
        return result
    except Exception as e:
        # 错误处理和重试逻辑
        context['ti'].xcom_push(key='task2_error', value=str(e))
        raise

2. 任务合并增强

def handle_task4(**context):
    """增强的任务4处理函数 - 合并两个任务的结果"""
    
    # 从任务2和任务3获取结果
    task2_result = context['ti'].xcom_pull(
        task_ids='task2_process_results', 
        key='task2_result'
    )
    task3_result = context['ti'].xcom_pull(
        task_ids='task3_process_results', 
        key='task3_result'
    )
    
    # 错误检查
    task2_error = context['ti'].xcom_pull(
        task_ids='task2_process_results', 
        key='task2_error'
    )
    task3_error = context['ti'].xcom_pull(
        task_ids='task3_process_results', 
        key='task3_error'
    )
    
    if task2_error or task3_error:
        print(f"部分处理失败: task2={task2_error}, task3={task3_error}")
        # 可在此处添加部分失败处理逻辑
        return "部分成功"
    
    # 合并结果逻辑
    merged_result = f"任务2: {task2_result} | 任务3: {task3_result}"
    
    # 发送最终通知
    send_completion_notification(merged_result)
    
    return merged_result

def send_completion_notification(message):
    """发送完成通知(示例)"""
    # 实际实现可通过邮件、Slack、SMS等
    print(f"处理完成通知: {message}")

3. 错误处理增强

# 在S3传感器中添加错误处理
wait_for_task2 = S3KeySensor(
    # ...
    soft_fail=True,  # 文件不存在时任务标记为跳过
    on_retry_callback=alert_on_retry,
    on_failure_callback=alert_on_failure
)

def alert_on_retry(context):
    """重试时发送告警"""
    print(f"任务{context['task'].task_id}正在重试 - 尝试#{context['ti'].try_number}")

def alert_on_failure(context):
    """失败时发送告警"""
    error = context.get('exception') or "未知错误"
    print(f"!!! 任务失败告警: {context['task'].task_id} - {error}")

4. 动态任务跳过

# 在任务4中使用自定义触发器规则
from airflow.utils.trigger_rule import TriggerRule

final_processing = PythonOperator(
    task_id='task4_final_processing',
    python_callable=handle_task4,
    trigger_rule=TriggerRule.ALL_SUCCESS,  # 默认值
    # trigger_rule=TriggerRule.ONE_SUCCESS,  # 任一成功即执行
    # trigger_rule=TriggerRule.NONE_FAILED,    # 没有失败则执行
)

生产部署注意事项

  1. ​AWS权限配置​​:

    • 确保Airflow工作节点具有S3访问权限

    • 设置最小权限原则(仅限必要存储桶)

  2. ​执行超时处理​​:

    wait_for_task2 = S3KeySensor(
        timeout=3600 * 3,  # 3小时超时
        # ...
    )
  3. ​跨地区访问优化​​:

    # 在S3Hook中指定地区
    s3 = S3Hook('aws_default', region_name='us-west-2')
  4. ​文件处理批量化​​:

    # 使用S3Prefix传感器处理批量文件
    from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor
    
    wait_for_task2 = S3PrefixSensor(
        bucket_name='processing-bucket',
        prefix='output/task2/',
        delimiter='/',
        aws_conn_id='aws_default',
        verify=False
    )
  5. ​结果持久化​​:

    # 将最终结果保存到数据库
    save_results = PythonOperator(
        task_id='save_results_to_db',
        python_callable=save_to_database,
        provide_context=True
    )
    
    final_processing >> save_results

这个实现结合了S3文件触发机制和任务依赖管理,完美满足您的菱形工作流需求。实际部署时,只需填充具体的文件处理逻辑和业务操作即可。

0

评论区