介绍 Amazon MWAA 对 Apache Airflow 版本 281 的支持 大数据博客
- 2026-01-27 12:26:50
- 12
Amazon MWAA 支持 Apache Airflow 281 版本
作者 Mansi Bhutada 和 Hernan Garcia日期 2024年2月28日来源 Amazon Managed Workflows for Apache Airflow (Amazon MWAA) 公告 永久链接 评论
关键要点
引入了Apache Airflow 281:为了提升工作流管理效率,Amazon MWAA现已支持Apache Airflow 281版本。对象存储:提供统一的对象存储抽象层,使工程师可以专注于数据管道。XCom UI改进:新增的XCom标签方便开发者直接在UI上查看任务间的数据传递。任务上下文日志:帮助记录任务失败原因,简化故障排除。数据集监听器:提供了对数据集事件的响应机制,方便用户开展自定义操作。Apache Airflow 281的发布
今天,我们宣布Amazon MWAA现已上线Apache Airflow 281环境。这一更新带来了一系列新功能和能力,使用户能够更有效地管理数据管道。利用这次更新,您可以轻松地设置或升级到Airflow 281版本。
对象存储功能
随着数据管道的规模扩大,工程师在多系统之间管理存储时面临许多挑战,包括独特的API、认证方法和数据访问规范。这往往需要编写自定义逻辑,并使用存储特定的操作符。Airflow现在提供了一个统一的对象存储抽象层,简化了这些操作,让工程师可以专注于数据管道。
以下是该功能的一些关键优势:
优势描述可移植的工作流可以通过最少的修改切换存储服务。高效的数据传输支持流式传输数据,避免占用过多内存。减少维护成本无需单独的操作符,使管道维护更简单。熟悉的编程体验可使用Python模块,如shutil进行文件操作。要使用Amazon S3的对象存储,您需要安装特定的包 s3fs 并指定Amazon提供者。

以下是将数据直接从Google Cloud Storage移动到Amazon S3的示例代码:
飞鱼加速器feiyu66pythongcsdatasource = ObjectStoragePath(gcs//sourcebucket/prefix/ connid=googleclouddefault)amazons3datatarget = ObjectStoragePath(s3//targetbucket/prefix/ connid=awsdefault)
with DAG( dagid=copyfromgcstoamazons3 startdate=datetime(2024 2 26) schedule=0 0 catchup=False tags=[28 ObjectStorage]) as dag
def listobjects(path ObjectStoragePath) gt list[ObjectStoragePath] objects = [f for f in pathiterdir() if fisfile()] return objectsdef copyobject(path ObjectStoragePath object ObjectStoragePath) objectcopy(dst=path)objectslist = listobjects(path=gcsdatasource)copyobjectpartial(path=amazons3datatarget)expand(object=objectslist)
有关Airflow对象存储的更多信息,请参见对象存储。
XCom UI 的增强
XCom跨通信能够在任务之间传递数据,促进它们之间的通信和协调。在之前的版本中,开发者需要切换视图才能查看与任务相关的XCom。现在,在Airflow 28中,XCom的键值将直接在Airflow的网格视图中展示,如下图所示。
新增的 XCom 标签提供了以下优点:
提高XCom的可视性:在UI中提供专门的标签,方便用户查看与DAG或任务相关的所有XCom。改进调试能力:可以直接在UI中查看XCom值,大大提高了调试的效率。任务上下文日志功能
任务生命周期管理对数据管道的平稳运行至关重要。然而,任务意外停止而导致的相关问题仍然存在。这种情况可能由于调度器超时、僵尸任务,或工作节点内存不足等多种原因引发。
传统上,由于核心Airflow组件如调度器或执行器触发的失败不会记录在任务日志中,这使得用户需要在Airflow UI外进行故障排除,增加了定位和解决问题的复杂性。
Airflow 28引入了一项显著的改进。现在,调度器和执行器等组件可以通过新的TaskContextLogger将错误消息直接转发至任务日志中。这一特性使您能够在一个地方查看与任务运行相关的所有错误信息,简化了排查任务失败原因的过程。
下图显示了任务被识别为僵尸,而调度器日志也包括在任务日志中。
要启用此功能,您需要将环境配置参数enabletaskcontextlogger设置为True。启用后,Airflow可以将调度器、执行器或回调运行上下文的日志发送到任务日志,并在Airflow UI中提供。
数据集的监听器钩子
数据集在Airflow 24中引入,是对数据源的逻辑分组,以实现数据感知调度和DAG间的依赖关系。例如,可以在生产者DAG更新数据集时,调度消费者DAG运行。
在Airflow 28中,为两个数据集事件新增了监听器:ondatasetcreated 和 ondatasetchanged。这有效地允许用户编写自定义代码,响应数据集管理操作。例如,您可以触发外部系统或发送通知。
使用数据集的监听器钩子非常简单。按照以下步骤创建ondatasetchanged的监听器:
创建监听器 (datasetlistenerpy)pythonfrom airflow import Datasetfrom airflowlisteners import hookimpl
@hookimpldef ondatasetchanged(dataset Dataset) 当数据集发生更改时执行的自定义代码。 print(调用外部端点)
验证特定数据集if dataseturi == s3//bucketprefix/objectkeyext print(对该数据集执行特定/不同操作)创建插件以注册监听器到Airflow环境 (datasetlistenerpluginpy)
pythonfrom airflowpluginsmanager import AirflowPluginfrom plugins import listenercode
class DatasetListenerPlugin(AirflowPlugin) name = datasetlistenerplugin listeners = [datasetlistener]
有关如何在Amazon MWAA中安装插件的更多信息,请参见安装自定义插件。
在Amazon MWAA中设置新的Airflow 281环境
您可以使用AWS管理控制台、API或AWS命令行界面 (AWS CLI)在您的账户和所选区域启动设置。如果您采用基础设施即代码IaC,可以使用AWS CloudFormation、AWS Cloud Development Kit (AWS CDK)或Terraform脚本来自动化设置。
在Amazon MWAA中成功创建Airflow 281环境后,某些包将自动安装在调度器和工作节点上。有关已安装包及其版本的完整列表,请参阅在Amazon MWAA环境中安装的Apache Airflow提供程序包。您可以使用要求文件安装其他包。
从旧版本的Airflow升级到281版本
通过将旧的Airflow 2x版本环境升级到281,您可以充分利用这些最新功能。有关就地版本升级的更多信息,请参阅升级Apache Airflow版本或介绍就地版本升级与Amazon MWAA。
结论
在这篇文章中,我们讨论了Airflow 28版本中引入的一些重要特性,如对象存储、新增的XCom标签、任务上下文日志、数据集的监听器钩子,以及如何开始使用它们。我们还提供了一些示例代码,以展示在Amazon MWAA中的实现。有关完整的更改列表,请查看Airflow发布说明。
有关Amazon MWAA的额外细节和代码示例,请访问Amazon MWAA用户指南和Amazon MWAA示例GitHub仓库。
Apache、Apache Airflow和Airflow是Apache软件基金会在美国及其他国家的注册商标或商标。
关于作者
Mansi Bhutada 是驻荷兰的ISV解决方案架构师。她帮助客户设计和实施在AWS上架构良好的解决方案,以解决其业务问题。她对数据分析和网络充满热情。工作之余,她喜欢实验烹饪、打壁球以及投入到有趣的桌游中。
Hernan Garcia 是驻荷兰的AWS高级解决方案架构师。他在金融服务行业工作,支持企业进行云端部署。他对无服务器技术、安全性和合规性感兴趣。业余时间喜欢与家人朋友共度时光,尝试不同美食。