在 Amazon MWAA 上引入共享 VPC 支持 大数据博客
- 2026-01-27 13:26:09
- 13
Amazon MWAA 新增共享 VPC 支持
关键要点
在本文中,我们将介绍如何在 Amazon MWAA 中使用客户管理的 VPC 端点自动化环境部署。这种集成使得许多组织能够更高效地使用共享或受限的 VPC。通过客户管理的端点,用户可以更好地满足严格的安全政策,从而确保 VPC 资源访问的安全性。
在本文中,我们将展示如何使用客户管理的 VPC 端点自动化 Amazon Managed Workflows for Apache AirflowAmazon MWAA部署,提供对共享 VPC 或其他受限 VPC 的兼容性。
数据科学家和工程师将 Apache Airflow 打造成领先的开源工具,用于创建数据管道,这得益于其活跃的开源社区、熟悉的 Python 开发模式有向无环图工作流以及大量的预构建集成库。Amazon MWAA 是一个管理服务,使在 AWS 上运行 Airflow 变得简单,无需管理底层基础设施的运维负担。每个 Airflow 环境中,Amazon MWAA 创建一个单租户服务 VPC,承载元数据库和用于提供用户界面的 Web 服务器。Amazon MWAA 还管理在客户拥有和管理的 VPC 中的 Airflow 调度器和工作实例,以调度和运行与客户资源交互的任务。这些在客户 VPC 中的 Airflow 容器通过一个 VPC 端点 访问服务 VPC 中的资源。
许多组织选择使用 AWS Organizations 来 集中管理其 VPC,允许一个拥有账户中的 VPC 与不同参与账户中的资源共享。然而,由于在 VPC 外创建新路由被视为特权操作,参与账户无法在拥有账户的 VPC 中创建端点。除此之外,许多客户也不希望将创建 VPC 端点所需的安全权限扩展到所有分配 Amazon MWAA 环境的用户。除了 VPC 端点外,客户还希望通过 Amazon Simple Queue ServiceAmazon SQS队列来限制数据外发,而 Amazon SQS 访问是 Amazon MWAA 架构 的一个要求。
Amazon MWAA 的共享 VPC 支持使您能够在自己的 VPC 中管理自己的端点,从而增添了对共享或其他受限 VPC 的兼容性。指定客户管理的端点还可以通过明确限制对仅与 Amazon MWAA 环境所需资源的 VPC 资源访问,来满足严格的安全政策。本文将演示客户管理的端点如何与 Amazon MWAA 配合使用,并提供有关如何自动化这些端点配置的示例。
解决方案概述
Amazon MWAA 的共享 VPC 支持允许多个 AWS 账户将他们的 Airflow 环境创建到共享的、集中管理的 VPC 中。拥有 VPC 的账户拥有者与同一 AWS Organizations 中的其他账户参与者共享 Amazon MWAA 所需的两个私有子网。在子网共享后,参与者可以查看、创建、修改和删除共享给他们的 Amazon MWAA 环境。
当用户在创建环境时指定需要一个共享或政策受限的 VPC 时,Amazon MWAA 首先会创建服务 VPC 资源,然后进入最多可达 72 小时的待处理状态,同时会通过 Amazon EventBridge 发送状态变化的通知。这允许拥有者根据 Amazon MWAA 控制台或 API 中的端点服务信息,为参与者创建所需的端点,或者通过 AWS Lambda 函数和 EventBridge 规则进行编程,如本文中的示例。
在拥有账户创建这些端点后,单租户 Amazon MWAA VPC 中的端点服务将检测到端点连接事件并恢复环境创建。如果出现问题,您可以在此待处理状态下通过删除环境来取消环境创建。
此外,此功能还允许您移除 创建、修改和删除 VPC 端点特权 的 AWS 身份与访问管理IAM主体验证,即使不使用共享 VPC,因为该权限将转而施加在创建端点的 IAM 主体上在我们的示例中为 Lambda 函数。此外,Amazon MWAA 环境将提供 Airflow Celery 执行器用于排队任务Celery 执行器队列的 SQS 队列 Amazon 资源名称ARN,使您能够明确将这些资源添加到网络策略中,而不是提供更开放和通用的权限。
在此示例中,我们在同一账户中创建 VPC 和 Amazon MWAA 环境。对于跨账户的共享 VPC,EventBridge 规则和 Lambda 函数将存在于拥有账户中,而 Amazon MWAA 环境将在参与账户中创建。有关更多信息,请参阅 在 AWS 账户之间发送和接收 Amazon EventBridge 事件。
先决条件
您应具备以下先决条件:
一个 AWS 账户该账户中的 AWS 用户,具备创建 VPC、VPC 端点和 Amazon MWAA 环境的权限该账户中的一个 Amazon Simple Storage Service Amazon S3存储桶,包含一个名为 dags 的文件夹创建 VPC
我们将通过一个 AWS CloudFormation 模板开始创建一个限制性 VPC,以模拟创建必要的 VPC 端点和修改 SQS 端点策略。如果您想使用现有的 VPC,可以直接进入下一节。
下载 CloudFormation 模板,此模板在 选项三:创建没有 Internet 访问的 Amazon VPC 网络 中有所提及。从下载的 ZIP 压缩包中解压出文件 cfnvpcprivatebjsyml。现在我们将编辑 CloudFormation 模板,以限制对 Amazon SQS 的访问。在 cfnvpcprivatebjsyml 中,编辑 SqsVpcEndoint 部分,使其如下所示:yaml SqsVpcEndoint Type AWSEC2VPCEndpoint Properties ServiceName !Sub comamazonaws{AWSRegion}sqs VpcEndpointType Interface VpcId !Ref VPC PrivateDnsEnabled true SubnetIds !Ref PrivateSubnet1 !Ref PrivateSubnet2 SecurityGroupIds !Ref SecurityGroup PolicyDocument Statement Effect Allow Principal Action Resource []
此附加的策略文档条目可以防止 Amazon SQS 外发到未明确列出的任何资源。
现在我们可以创建 CloudFormation 堆栈。
在 AWS CloudFormation 控制台中,选择 创建堆栈。选择 上传模板文件。选择 选择文件。浏览至您修改后的文件。选择 下一步。在 堆栈名称中,输入 MWAAEnvironmentVPC。持续选择 下一步,直至到达审核页面。选择 提交。创建 Lambda 函数
我们有两种选项来自行管理我们的端点:手动和自动化。在此示例中,我们创建一个 Lambda 函数响应 Amazon MWAA 的 EventBridge 通知。您也可以使用 EventBridge 通知发送一个 Amazon Simple Notification ServiceAmazon SNS消息,例如通过电子邮件发送给有权限手动创建 VPC 端点的人。
首先,我们创建一个 Lambda 函数以响应 Amazon MWAA 发布的 EventBridge 事件。
在 Lambda 控制台,选择 创建函数。在 名称 中,输入 mwaacreatelambda。对于 运行时,选择 Python 311。选择 创建函数。在 代码 部分,针对 lambdafunction 输入以下代码:python import boto3 import json import logging
logger = logginggetLogger() loggersetLevel(loggingINFO)
def lambdahandler(event context) if event[detail][status]==PENDING detail=event[detail] name=detail[name] celeryExecutorQueue=detail[celeryExecutorQueue] subnetIds=detail[networkConfiguration][subnetIds] securityGroupIds=detail[networkConfiguration][securityGroupIds] databaseVpcEndpointService=detail[databaseVpcEndpointService]
# MWAA 不需要存储 VPC ID,但我们可以从子网中获取它 client = boto3client(ec2) response = clientdescribesubnets(SubnetIds=subnetIds) loggerinfo(response[Subnets][0][VpcId]) vpcId=response[Subnets][0][VpcId] loggerinfo(vpcId vpcId) webserverVpcEndpointService=None if detail[webserverAccessMode]==PRIVATEONLY webserverVpcEndpointService=event[detail][webserverVpcEndpointService] response = clientdescribevpcendpoints( VpcEndpointIds=[] Filters=[ {Name vpcid Values [vpcId]} {Name servicename Values [sqs]} ] MaxResults=1000 ) sqsVpcEndpoint=None for r in response[VpcEndpoints] if subnetIds[0] in r[SubnetIds] or subnetIds[1] in r[SubnetIds] # 我们按服务名称筛选,所以这必然是 SQS sqsVpcEndpoint=r break if sqsVpcEndpoint loggerinfo(Found SQS endpoint sqsVpcEndpoint[VpcEndpointId]) loggerinfo(sqsVpcEndpoint) pd = jsonloads(sqsVpcEndpoint[PolicyDocument]) for s in pd[Statement] if s[Effect]==Allow resource = s[Resource] loggerinfo(resource) if in resource loggerinfo( already allowed) elif celeryExecutorQueue in resource loggerinfo(celeryExecutorQueue already allowed) else s[Resource]append(celeryExecutorQueue) loggerinfo(Updating SQS policy to str(pd)) clientmodifyvpcendpoint( VpcEndpointId=sqsVpcEndpoint[VpcEndpointId] PolicyDocument=jsondumps(pd) ) break # 创建 MWAA 数据库端点 loggerinfo(creating endpoint to databaseVpcEndpointService) endpointName=namedatabase response = clientcreatevpcendpoint( VpcEndpointType=Interface VpcId=vpcId ServiceName=databaseVpcEndpointService SubnetIds=subnetIds SecurityGroupIds=securityGroupIds TagSpecifications=[ { ResourceType vpcendpoint Tags [ { Key Name Value endpointName } ] } ] ) loggerinfo(created VPCE response[VpcEndpoint][VpcEndpointId]) # 创建 MWAA web 服务器端点如果是私有 if webserverVpcEndpointService endpointName=namewebserver loggerinfo(creating endpoint to webserverVpcEndpointService) response = clientcreatevpcendpoint( VpcEndpointType=Interface VpcId=vpcId ServiceName=webserverVpcEndpointService SubnetIds=subnetIds SecurityGroupIds=securityGroupIds TagSpecifications=[ { ResourceType vpcendpoint Tags [ { Key Name Value endpointName } ] } ] ) loggerinfo(created VPCE response[VpcEndpoint][VpcEndpointId]) return { statusCode 200 body jsondumps(event[detail][status]) }选择 部署。在 Lambda 函数的 配置 标签中,在 常规配置 部分选择 编辑。在 超时,增加到 5 分钟,0 秒。选择 保存。在 权限 部分下,选择执行角色名称以编辑此函数的权限。在 权限策略 中,选择 策略名称 下的链接。选择 编辑,添加一个逗号以及以下语句:json { Sid Statement1 Effect Allow Action [ ec2DescribeVpcEndpoints ec2CreateVpcEndpoint ec2ModifyVpcEndpoint ec2DescribeSubnets ec2CreateTags ] Resource [ ] }
完整策略应如下所示:
json{ Version 20121017 Statement [ { Effect Allow Action logsCreateLogGroup Resource arnawslogsuseast1112233445566 } { Effect Allow Action [ logsCreateLogStream logsPutLogEvents ] Resource [ arnawslogsuseast1112233445566loggroup/aws/lambda/mwaacreatelambda ] } { Sid Statement1 Effect Allow Action [ ec2DescribeVpcEndpoints ec2CreateVpcEndpoint ec2ModifyVpcEndpoint ec2DescribeSubnets ec2CreateTags ] Resource [ ] } ]}
加速器永久免费版免费选择 下一步 直到到达审核页面。选择 保存更改。创建 EventBridge 规则
接下来,我们配置 EventBridge 将 Amazon MWAA 通知发送到我们的 Lambda 函数。
在 EventBridge 控制台,选择 创建规则。在 名称 中,输入 mwaacreate。选择 带事件模式的规则。选择 下一步。在 创建方式 中,选择 用户模式表单。选择 编辑模式。在 事件模式 中输入以下内容:
json{ source [awsairflow] detailtype [MWAA Environment Status Change]}
选择 下一步。在 选择目标 中,选择 Lambda 函数。您也可以指定 SNS 通知,以便在环境状态发生变更时接收消息。
在 函数 中,选择 mwaacreatelambda。选择 下一步 直至到达最后部分,然后选择 创建规则。创建 Amazon MWAA 环境
最后,我们使用客户管理的端点创建 Amazon MWAA 环境。
在 Amazon MWAA 控制台,选择 创建环境。在 名称 中,为您的环境输入一个唯一名称。在 Airflow 版本 中,选择最新的 Airflow 版本。在 S3 桶 中,选择 浏览 S3 并选择您的 S3 存储桶,或者输入 Amazon S3 URI。在 DAGs 文件夹 中,选择 浏览 S3 并选择 S3 存储桶中的 dags/ 文件夹,或者输入 Amazon S3 URI。选择 下一步。在 虚拟私有云 中,选择您之前创建的 VPC。在 Web 服务器访问 中,选择 公共网络可以访问 Internet。在 安全组 中,取消选择 创建新安全组。选择由 CloudFormation 模板创建的共享 VPC 安全组。由于之前步骤的 AWS PrivateLink 端点的安全组是自引用的,因此必须为您的 Amazon MWAA 环境选择相同的安全组。
在 端点管理 中,选择 客户管理的端点。保持其余设置为默认值,选择 下一步。选择 创建环境。当您的环境可用时,可以通过 Amazon MWAA 控制台上的 打开 Airflow UI 链接访问它。
清理
清理不活动的资源有助于降低成本,并且是最佳实践。如果您不删除资源,可能会产生额外的费用。要清理您的资源,请执行以下步骤:
1