Airflow 快速入门
安装:
pip install apache-airflow
初始化数据库:
airflow db init
将上面的代码示例保存为
data_pipeline.py
放入~/airflow/dags
目录启动:
airflow webserver
&airflow scheduler
在Web界面(localhost:8080)触发流水线执行!
创建用户:airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
编码即流程
以下是结合S3文件触发机制和菱形依赖关系的完整Airflow DAG示例:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from datetime import datetime, timedelta
# 自定义辅助函数
def get_task2_path(context):
"""为任务2生成唯一的S3路径"""
return f"output/task2/{context['dag_run'].run_id}.result"
def get_task3_path(context):
"""为任务3生成唯一的S3路径"""
return f"output/task3/{context['dag_run'].run_id}.result"
def handle_task4():
"""任务4的处理函数"""
print("任务2和任务3都已完成,开始执行最终处理")
# 在这里添加合并结果的逻辑
return "所有处理完成"
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
with DAG(
's3_diamond_workflow',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
max_active_runs=3
) as dag:
# ===== 任务1:启动处理 =====
start_processing = SimpleHttpOperator(
task_id='task1_start_processing',
method='POST',
http_conn_id='api_endpoint',
endpoint='/process',
data='{'
f'"task2_output": "{get_task2_path}",'
f'"task3_output": "{get_task3_path}",'
'"run_id": "{{ run_id }}"'
'}',
headers={"Content-Type": "application/json"},
do_xcom_push=True
)
# ===== 任务2:处理分支1 =====
# 2.1 等待S3文件生成 (分支1)
wait_for_task2 = S3KeySensor(
task_id='wait_for_task2_output',
bucket_name='processing-bucket',
bucket_key=get_task2_path,
aws_conn_id='aws_default',
poke_interval=60,
timeout=3600,
mode='reschedule'
)
# 2.2 处理任务2的文件
process_task2 = PythonOperator(
task_id='task2_process_results',
python_callable=lambda **ctx: (
f"处理任务2结果: {ctx['dag_run'].run_id}"
)
)
# ===== 任务3:处理分支2 =====
# 3.1 等待S3文件生成 (分支2)
wait_for_task3 = S3KeySensor(
task_id='wait_for_task3_output',
bucket_name='processing-bucket',
bucket_key=get_task3_path,
aws_conn_id='aws_default',
poke_interval=60,
timeout=3600,
mode='reschedule'
)
# 3.2 处理任务3的文件
process_task3 = PythonOperator(
task_id='task3_process_results',
python_callable=lambda **ctx: (
f"处理任务3结果: {ctx['dag_run'].run_id}"
)
)
# ===== 任务4:最终处理(依赖任务2和任务3) =====
final_processing = PythonOperator(
task_id='task4_final_processing',
python_callable=handle_task4
)
# ===== 可选:清理任务 =====
clean_task2 = S3CopyObjectOperator(
task_id='clean_task2_output',
source_bucket_name='processing-bucket',
source_bucket_key=get_task2_path,
dest_bucket_name='archive-bucket',
dest_bucket_key=lambda ctx: f"archived/{ctx['ds']}/" + get_task2_path(ctx),
delete_src=True
)
clean_task3 = S3CopyObjectOperator(
task_id='clean_task3_output',
source_bucket_name='processing-bucket',
source_bucket_key=get_task3_path,
dest_bucket_name='archive-bucket',
dest_bucket_key=lambda ctx: f"archived/{ctx['ds']}/" + get_task3_path(ctx),
delete_src=True
)
# ===== 设置依赖关系 =====
# 任务1完成后并行执行两个分支的等待任务
start_processing >> [wait_for_task2, wait_for_task3]
# 每个分支内部的文件等待 -> 处理
wait_for_task2 >> process_task2
wait_for_task3 >> process_task3
# 两个分支都完成后执行最终处理
[process_task2, process_task3] >> final_processing
# 最终处理完成后清理文件(可选)
final_processing >> [clean_task2, clean_task3]
# 说明: 实际环境中需要添加具体的文件处理函数
# 例如:
# def process_file_content(bucket, key):
# from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# s3 = S3Hook('aws_default')
# file_path = s3.download_file(key=key, bucket_name=bucket)
# # ... 处理文件内容 ...
工作流执行过程详解
1. 流程结构
[task1_start_processing]
/ \
[wait_for_task2] [wait_for_task3]
| |
[task2_process_results] [task3_process_results]
\ /
[task4_final_processing]
/ \
[clean_task2] [clean_task3]
2. 执行顺序
任务1 调用HTTP API启动处理:
传递两个S3输出路径(分别给任务2和任务3)
包含唯一标识符(run_id)
任务2路径
等待
/output/task2/{run_id}.result
文件出现处理文件内容
完成后触发清理操作
任务3路径
等待
/output/task3/{run_id}.result
文件出现处理文件内容
完成后触发清理操作
任务4
等待任务2和任务3都完成
执行最终汇总/合并操作
清理任务
将处理完成的文件移动到归档存储桶
删除源文件
生产环境增强建议
1. 文件处理函数(实际实现)
def process_task2_file(**context):
"""实际处理任务2文件的函数"""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3 = S3Hook('aws_default')
bucket = 'processing-bucket'
key = get_task2_path(context)
try:
# 下载文件
file_path = s3.download_file(key=key, bucket_name=bucket)
# 读取并处理文件内容
with open(file_path) as f:
content = f.read()
result = f"任务2处理结果: {len(content)}字节"
# 可选的: 存储结果供任务4使用
context['ti'].xcom_push(key='task2_result', value=result)
return result
except Exception as e:
# 错误处理和重试逻辑
context['ti'].xcom_push(key='task2_error', value=str(e))
raise
2. 任务合并增强
def handle_task4(**context):
"""增强的任务4处理函数 - 合并两个任务的结果"""
# 从任务2和任务3获取结果
task2_result = context['ti'].xcom_pull(
task_ids='task2_process_results',
key='task2_result'
)
task3_result = context['ti'].xcom_pull(
task_ids='task3_process_results',
key='task3_result'
)
# 错误检查
task2_error = context['ti'].xcom_pull(
task_ids='task2_process_results',
key='task2_error'
)
task3_error = context['ti'].xcom_pull(
task_ids='task3_process_results',
key='task3_error'
)
if task2_error or task3_error:
print(f"部分处理失败: task2={task2_error}, task3={task3_error}")
# 可在此处添加部分失败处理逻辑
return "部分成功"
# 合并结果逻辑
merged_result = f"任务2: {task2_result} | 任务3: {task3_result}"
# 发送最终通知
send_completion_notification(merged_result)
return merged_result
def send_completion_notification(message):
"""发送完成通知(示例)"""
# 实际实现可通过邮件、Slack、SMS等
print(f"处理完成通知: {message}")
3. 错误处理增强
# 在S3传感器中添加错误处理
wait_for_task2 = S3KeySensor(
# ...
soft_fail=True, # 文件不存在时任务标记为跳过
on_retry_callback=alert_on_retry,
on_failure_callback=alert_on_failure
)
def alert_on_retry(context):
"""重试时发送告警"""
print(f"任务{context['task'].task_id}正在重试 - 尝试#{context['ti'].try_number}")
def alert_on_failure(context):
"""失败时发送告警"""
error = context.get('exception') or "未知错误"
print(f"!!! 任务失败告警: {context['task'].task_id} - {error}")
4. 动态任务跳过
# 在任务4中使用自定义触发器规则
from airflow.utils.trigger_rule import TriggerRule
final_processing = PythonOperator(
task_id='task4_final_processing',
python_callable=handle_task4,
trigger_rule=TriggerRule.ALL_SUCCESS, # 默认值
# trigger_rule=TriggerRule.ONE_SUCCESS, # 任一成功即执行
# trigger_rule=TriggerRule.NONE_FAILED, # 没有失败则执行
)
生产部署注意事项
AWS权限配置:
确保Airflow工作节点具有S3访问权限
设置最小权限原则(仅限必要存储桶)
执行超时处理:
wait_for_task2 = S3KeySensor( timeout=3600 * 3, # 3小时超时 # ... )
跨地区访问优化:
# 在S3Hook中指定地区 s3 = S3Hook('aws_default', region_name='us-west-2')
文件处理批量化:
# 使用S3Prefix传感器处理批量文件 from airflow.providers.amazon.aws.sensors.s3_prefix import S3PrefixSensor wait_for_task2 = S3PrefixSensor( bucket_name='processing-bucket', prefix='output/task2/', delimiter='/', aws_conn_id='aws_default', verify=False )
结果持久化:
# 将最终结果保存到数据库 save_results = PythonOperator( task_id='save_results_to_db', python_callable=save_to_database, provide_context=True ) final_processing >> save_results
这个实现结合了S3文件触发机制和任务依赖管理,完美满足您的菱形工作流需求。实际部署时,只需填充具体的文件处理逻辑和业务操作即可。
评论区