Creating an SNS Fanout in Serverless
data:image/s3,"s3://crabby-images/69d2c/69d2c2d04d66e955538718d1823c20cb9e18fbf4" alt="Creating an SNS Fanout in Serverless"
Previously we've covered what the Barstool Queue Engine is and how we use it for long-running services. Our next problem was how to handle alerting our various services that those jobs had finished or errored. As we rely heavily on the Serverless framework, we implemented this principle using their tools. The idea of an SNS fanout is to send out messages to a single SNS topic that any SQS handler could listen to and interpret. To begin, we need a new topic, MyOutputTopic
, created as follows that utilizes a multi-stage deployment:
resources:
Resources:
MyOutputTopic:
Type: 'AWS::SNS::Topic'
Properties:
TopicName: MyOutput-${opt:stage}
Since our services are in separate repositories, we also need to make sure the ARN of that topic is accessible outside of this single serverless deployment. Serverless provides an Output
implementation to direct our stack to export values:
resources:
Outputs:
MyOutputTopic:
Value: !Ref MyOutputTopic
Export:
Name: MyOutputTopic-${opt:stage}
In our other repositories, we built out SQS queues in Serverless that subscribe to a singular topic. We had to create access policies so that we had permission to read from the topic:
resources:
# Create a basic queue
Service2Queue:
Type: 'AWS::SQS::Queue'
Properties:
QueueName: Service2-${opt:stage}
VisibilityTimeout: 60
# Create an SNS subscription for the queue above
Service2Subscription:
Type: 'AWS::SNS::Subscription'
Properties:
TopicArn: MyOutputTopic-${opt:stage}
Endpoint:
Fn::GetAtt: [Service2Queue, Arn]
Protocol: sqs
RawMessageDelivery: 'true'
# Provide the proper permissions for Service2Queue to recieve messages from MyOutputTopic-{$opt:stage}
Service2QueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal: '*'
Action: SQS:SendMessage
Resource:
- Fn::GetAtt: [Service2Queue, Arn]
Condition:
ArnEquals:
AWS:SourceArn: MyOutputTopic-${opt:stage}
Queues:
- !Ref Service2Queue
Now we have an SQS queue Service2Queue
that is listening to all messages sent to the MyOutputTopic-${opt:stage}
topic. This would have been enough if we truly cared about receiving everything but we don't, so as an additional step we employed SQS filters to limit what each queue would receive. When we send messages to BQE we include the following snippet as part of the message:
MessageAttributes: {
filterKey1: {
DataType: 'String',
StringValue: `${output.filterKey1}`
},
filterKey2: {
DataType: 'String.Array',
StringValue: JSON.stringify(output.filterKey2)
}
}
Once the above exists on a message the Serverless definition for the subscription can be updated to include a FilterPolicy
to limit the invocations to only the messages that the queue needs to handle:
resources:
Service2Subscription:
Type: 'AWS::SNS::Subscription'
Properties:
TopicArn: MyOutputTopic-${opt:stage}
Endpoint:
Fn::GetAtt: [Service2Queue, Arn]
Protocol: sqs
RawMessageDelivery: 'true'
FilterPolicy:
filterKey1: ['foo']
filterKey2: ['bar']
The last piece of code to drive this home is to hook the subscription into a lambda function to process the messages:
functions:
MyOutputTopicQueue:
handler: handlers/sqs.myOutputTopic
events:
- sqs:
arn:
Fn::GetAtt: [Service2Queue, Arn]
batchSize: 10
SNS fanouts have proven incredibly helpful for our needs. I've put all this yml
together in a Gist for easy reference. To learn more about SNS, Amazon has additional information here.