案例中心

  • 首页
  • 案例中心
  • 使用 Amazon SNS FIFO 存档和重放消息 计算博客

使用 Amazon SNS FIFO 存档和重放消息 计算博客

2026-01-27 12:47:38

使用 Amazon SNS FIFO 存档和重放消息

关键要点

Amazon SNS FIFO 新功能:支持自动存档和重放发布到 FIFO 主题的消息,提供无代码的功能,促进故障恢复和状态复制。故障恢复场景:开发者可以重处理消息,帮助恢复因下游应用或依赖问题引起的故障。状态复制场景:新应用可以复制已订阅应用的状态,确保数据的一致性。启用存档策略:在 SNS 控制台创建 FIFO 主题时,可定义存档政策以确定消息存储时间。

今天,AWS 宣布了一项新功能,允许用户将发布到 SNS FIFO先进先出主题的消息进行存档和重放。当启用存档政策时,SNS FIFO 主题会自动执行以下操作:

事件存档:通过无代码、就地的消息存档,存储消息无需外部资源。用户只需定义主题的存档政策,包括所需的保存期限1 到 365 天。事件重放:订阅者可以利用内置的进度报告和消息过滤功能,无需编码来重放消息。订阅者只需为其订阅应用重放政策,定义开始和结束时间戳。

此功能在故障恢复和状态复制场景中非常有用。

故障恢复

在故障恢复场景中,开发者可以利用这一功能重新处理部分消息,从而恢复下游应用的故障或依赖问题。例如,假设一个搜索应用需要重新处理消息,因为搜索引擎的索引已被清除。要启动恢复,搜索应用可以通过 SetSubscriptionAttributes API 操作更新其现有订阅的 ReplayPolicy 属性,从特定时间点开始接收消息,而不是从应用存档政策时开始。

状态复制

在状态复制场景中,此功能允许新应用复制以前订阅应用的状态。比如一个内部数据仓库应用需要复制外部搜索应用的状态,以便将搜索引擎中的数据提供给产品经理和其他内部员工。数据仓库应用通过 Subscribe API 操作将其新创建的端点例如,Amazon SQS FIFO 队列订阅到主题,并设置 ReplayPolicy 订阅属性。

如果它希望复制搜索引擎的完整状态,它可能会将其 ReplayPolicy 中的时间戳设置为与搜索引擎订阅的创建日期和时间相吻合,从而确保所有曾经发送给搜索引擎的数据也会发送给数据仓库工具。

在 SNS 控制台启用存档政策

在创建新的 SNS FIFO 主题时,会看到一个存档政策选项。此政策决定 SNS 保存消息的时间,从而使消息在必要时可供重新发送到订阅者。存档政策默认不激活,用户必须手动为每个主题启用它,或者自动化该操作。

例如,该 FIFO 主题的保存期限设定为 30 天,但用户可以将此持续时间调整为 1 到 365 天。一旦激活存档政策,发送到此主题的消息会在定义的时间内进行存档。

在创建主题后,可以检查主题详细信息以确认存档政策是否生效。存档政策旁边的状态会显示为 Active。

通过将 SQS FIFO 队列订阅到 SNS FIFO 主题,用户可以重放消息,并且 Replay status 将显示 Not running。用户可以同时将 FIFO 和标准 SQS 队列订阅到 SNS FIFO 主题,为各种用例需求提供灵活性。要启动重放,用户只需在 SNS 控制台中选择 Replay,然后选择 Start replay。

启动重放后,会出现窗口,允许用户指定开始和结束日期,以及精确的时间点,以重放存档的消息。此功能让用户可以设定特定时间表,仅重放感兴趣的消息,而不是所有存档消息。当选择 Start replay 后,服务开始向订阅者发送消息。

使用 Amazon SNS FIFO 存档和重放消息 计算博客

用户还可以使用 AWS CloudFormation 和 AWS Serverless Application Model (AWS SAM) 定义 SNS FIFO 存档和重放功能的设置。

用例

通过重放事件进行微服务的错误恢复

在一个保险应用使用多个微服务的场景中,某个索赔处理微服务遇到错误并丢失了一个索赔,这种疏漏可能导致工作负载不同步。利用存档和重放功能,用户可以查看并重放错误检测时的事件。这使得微服务能够识别遗漏的事件并完成必要的操作,保证系统保持更新和准确。

在多个区域之间复制状态

如果一个应用跨越多个区域,而在其主要区域的微服务遇到困难,用户可以将基础设施复制到另一个区域,以便采取主动/备份的设置。可以将流量重新路由至次要区域的备用微服务,通过事件重放保持同步。用户可以在 SNS 重放政策中设置结束时间,如果未定义,则重放将持续直到发送完所有最新消息。

之后,SNS 订阅将恢复正常功能,捕获所有新消息。这种方法适用于多种状态复制场景,例如跨区域备份策略,有助于最小化停机时间并防止消息丢失。

配置 SNS FIFO 存档和重放以处理汽车保险

管理汽车保险索赔需要及时的协调。这个操作指南展示了 SNS FIFO 和 SQS FIFO 的结合优势,以正确的顺序处理索赔。SQS FIFO 和 SQS 标准队列均可以订阅 SNS FIFO 主题,为处理索赔提供灵活性。SNS FIFO 的存档和重放功能至关重要,确保下游微服务出现中断时并不会影响索赔的完整性。

前提条件

AWS Command Line Interface (CLI):可以通过 文档 下载并设置。JQ:安装请遵循 安装说明。

第一步 使用 AWS CLI 创建资源并存储变量

在终端中运行以下命令:

bash

定义 AWS 区域

REGION=(aws configure get region)

为汽车保险索赔创建 SNS FIFO 主题

AUTOINSURANCETOPICARN=(aws sns createtopic name AutoInsuranceClaimsTopicfifo attributes FifoTopic=trueContentBasedDeduplication=trueDisplayName=Auto Insurance Claims Topic region REGION jq r TopicArn)

创建主处理和重放的 SQS FIFO 队列

AUTOINSURANCEQUEUEURL=(aws sqs createqueue queuename AutoInsuranceClaimsQueuefifo attributes FifoQueue=true region REGION jq r QueueUrl)AUTOINSURANCEREPLAYQUEUEURL=(aws sqs createqueue queuename AutoInsuranceReplayQueuefifo attributes FifoQueue=true region REGION jq r QueueUrl)

获取两者的 ARN

AUTOINSURANCEQUEUEARN=(aws sqs getqueueattributes queueurl AUTOINSURANCEQUEUEURL attributenames QueueArn region REGION jq r AttributesQueueArn)AUTOINSURANCEREPLAYQUEUEARN=(aws sqs getqueueattributes queueurl AUTOINSURANCEREPLAYQUEUEURL attributenames QueueArn region REGION jq r AttributesQueueArn)

定义允许主题发布到两个队列的策略

SQSPOLICYTEMPLATE={Policy { Version 20121017 Statement [ { Sid 1 Effect Allow Principal { Service snsamazonawscom } Action [sqsSendMessage] Resource [AUTOINSURANCEQUEUEARN AUTOINSURANCEREPLAYQUEUEARN] Condition { ArnLike { awsSourceArn [AUTOINSURANCETOPICARN] } } } ]}}

应用访问策略到队列

aws sqs setqueueattributes queueurl AUTOINSURANCEQUEUEURL attributes file//lt(echo SQSPOLICYTEMPLATE)aws sqs setqueueattributes queueurl AUTOINSURANCEREPLAYQUEUEURL attributes file//lt(echo SQSPOLICYTEMPLATE)

将主队列订阅到创建的 SNS FIFO 主题

aws sns subscribe topicarn AUTOINSURANCETOPICARN protocol sqs notificationendpoint AUTOINSURANCEQUEUEARN region REGION

第二步 在 SNS FIFO 主题上设置存档策略

修改 SNS FIFO 主题的属性以设置保存期限。这决定消息在主题存档中的保存时间。此示例使用 30 天。

bash

将 SNS FIFO 主题的保存期限设置为 30 天

aws sns settopicattributes region REGION topicarn AUTOINSURANCETOPICARN attributename ArchivePolicy attributevalue {MessageRetentionPeriod30}

第三步 发布汽车保险索赔详情

向 SNS FIFO 主题发布一个示例索赔。这一步模拟了现实世界场景,其中保险索赔需要由主题的订阅者处理。

bash

获取当前时间戳并发布样本保险索赔

TIMESTAMPSTART=(date u FTT000Z)aws sns publish region REGION topicarn AUTOINSURANCETOPICARN message { claimtype collision registration AB123CDE } messagegroupid group1

第四步 阅读汽车保险索赔细节

从主 SQS FIFO 队列中检索保险索赔细节。此过程模拟了读取保险索赔以采取行动的过程。读取消息后,索赔将从队列中删除,以避免重复处理。

bash

从主队列获取索赔详情,然后删除以避免冗余

MESSAGE=(aws sqs receivemessage region REGION queueurl AUTOINSURANCEQUEUEURL output json)MESSAGETEXT=(echo MESSAGE jq r Messages[0]Body)MESSAGERECEIPT=(echo MESSAGE jq r Messages[0]ReceiptHandle)aws sqs deletemessage region REGION queueurl AUTOINSURANCEQUEUEURL receipthandle MESSAGERECEIPTecho 收到的索赔详情 {MESSAGETEXT}

第五步 将重放 SQS 队列订阅到 SNS FIFO 主题

为确保没有索赔丢失,为 SQS FIFO 队列订阅配置重放政策。此政策设置了消息重放至 SQS FIFO 队列的时间表。在此,用户将重放队列订阅到主题,并定义其重放政策,然后监控重放队列的状态。一旦完成,从次要 SQS FIFO 队列读取重放的索赔详情。如果最初出现任何处理问题,仍会有第二次机会处理索赔。

将重放队列订阅到 SNS FIFO 主题:

bash

将重放队列订阅到主题并定义其重放政策

NEWSUBSCRIPTIONARN=(aws sns subscribe region REGION topicarn AUTOINSURANCETOPICARN protocol sqs returnsubscriptionarn notificationendpoint AUTOINSURANCEREPLAYQUEUEARN attributes {ReplayPolicy{PointTypeTimestampStartingPointTIMESTAMPSTART}} output json jq r SubscriptionArn)

监控重放状态:

bash

等待重放完成

while [[ (aws sns getsubscriptionattributes region REGION subscriptionarn NEWSUBSCRIPTIONARN output text awk END{print 9}) != Completed ]] do printf sleep 5 done echo 重放完成

读取重放的消息并将消息从队列中删除:

bash

获取重放的消息并将其从队列中移除

REPLAYEDMESSAGE=(aws sqs receivemessage region REGION queueurl AUTOINSURANCEREPLAYQUEUEURL output json)REPLAYEDMESSAGETEXT=(echo REPLAYEDMESSAGE jq r Messages[0]Body)REPLAYEDMESSAGERECEIPT=(echo REPLAYEDMESSAGE jq r Messages[0]ReceiptHandle)aws sqs deletemessage region REGION queueurl AUTOINSURANCEREPLAYQUEUEURL receipthandle REPLAYEDMESSAGERECEIPTecho 收到的重放索赔详情 {REPLAYEDMESSAGETEXT}

清理

为了避免不必要的费用,请清理在本操作指南中创建的资源:

bash

删除主 SQS FIFO 队列

aws sqs deletequeue queueurl AUTOINSURANCEQUEUEURL region REGION

删除重放 SQS FIFO 队列

aws sqs deletequeue queueurl AUTOINSURANCEREPLAYQUEUEURL region REGION

魔方加速器免费正版

取消设置 ArchivePolicy 属性

aws sns settopicattributes region REGION topicarn AUTOINSURANCETOPICARN attributename ArchivePolicy attributevalue {}

删除 SNS FIFO 主题

aws sns deletetopic topicarn AUTOINSURANCETOPICARN region REGION

结论

新的 SNS FIFO 存档和重放功能为事件驱动的应用程序提供了坚实的基础,强调了故障恢复和应用状态复制。这些功能使开发者能够高效管理和恢复干扰,并确保在不同应用实例或环境中的状态复制。

想要开始使用这一新的 SNS FIFO 功能,用户可以通过 AWS 管理控制台、AWS CLI、AWS 软件开发工具包 (SDK) 或 AWS CloudFormation 开启。有关价格信息,请参见 SNS 定价 和 SQS 定价。

有关更多无服务器学习资源,请访问 Serverless Land。