Handling events with maintainable Lambda functions

11 minute read

In the previous post we’ve seen how we can leverage the AWSLambdaSharpTemplate library and its templates to create a Lambda function that is testable, supports ASP.NET Core subsystems without polluting the business logic.

In this post, we will see how we can create a Lambda function that responds to events and we will dive into handling SNS notifications and SQS messages.

To have some concrete examples to work towards, we will create three functions the we will use to process files that are added to a well-known S3 bucket and notify the stakeholders.

Responding to an S3 event

The first function will respond to an event raised by S3 when a file is written in some bucket. This function will read the file, parse its content and enqueue a message in SQS for each entry in the file.

Getting started with the first function

Let’s start creating the first function by using the lambda-template-event-empty template from the Kralizek.Lambda.Templates template collection we installed in the previous post.

$ dotnet new lambda-template-event-empty -o S3EventHandlerFunction
$ cd S3EventHandlerFunction

Also, let’s add the Amazon.Lambda.S3Events package that contains a strongly-typed representation of the events fired by S3.

$ dotnet add package Amazon.Lambda.S3Events

Now, let’s replace the StringEventHandler class that came with the template with a stub for our own implementation of the interface IEventHandler<S3Event>.

public class S3EventHandler : IEventHandler<S3Event>
{
    public Task HandleAsync(S3Event? input, ILambdaContext context)
    {
        throw new System.NotImplementedException();
    }
}

Let’s also update the Function class, specifically the ConfigureServices method, to register our handler.

protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
{
    RegisterHandler<S3EventHandler>(services);
}

Like for the previous post, I’ll implement the steps that I present in this post in the repository available in GitHub. You can check the diff of the steps above here.

Implementing the event handler

This first function is responsible for responding to the event that S3 will raise when a file is added or modified to the bucket.

The S3EventHandler will do the following:

  • Fetch the modified file
  • Parse its content
  • Adds a message to an SQS queue for each entry of the file

It has to be noted that AWS might cluster multiple notifications into a single event. Each notification will be contained in the Records collection available in the payload of the event.

For a full example of a S3 event raised by AWS, check this article of the AWS documentation.

Before we can start implement our handler, we need to import some packages.

$ dotnet add package AWSSDK.SQS
$ dotnet add package AWSSDK.S3

Then we can introduce some auxilliary types

public record Entry(Guid Id);

public class S3EventHandlerOptions
{
    public string QueueUrl { get; init; } = default!;
}

And finally we can implement our event handler

public class S3EventHandler : IEventHandler<S3Event>
{
    private readonly IAmazonS3 _s3;
    private readonly IAmazonSQS _sqs;
    private readonly S3EventHandlerOptions _options;
    private readonly ILogger<S3EventHandler> _logger;

    public S3EventHandler(IAmazonS3 s3, IAmazonSQS sqs, IOptions<S3EventHandlerOptions> options, ILogger<S3EventHandler> logger)
    {
        _s3 = s3 ?? throw new ArgumentNullException(nameof(s3));
        _sqs = sqs ?? throw new ArgumentNullException(nameof(sqs));
        _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    public async Task HandleAsync(S3Event? input, ILambdaContext context)
    {
        if (input is not { Records.Count: > 0 }) return;

        foreach (var record in input.Records)
        {
            string key = record.S3.Object.Key;
            string bucket = record.S3.Bucket.Name;

            _logger.LogDebug("Processing {Key} in {Bucket}", key, bucket);

            var response = await _s3.GetObjectAsync(bucket, key);

            var entries = JsonSerializer.Deserialize<Entry[]>(response.ResponseStream);

            if (entries is null) continue;

            foreach (var entry in entries)
            {
                await _sqs.SendMessageAsync(_options.QueueUrl, entry.Id.ToString());
            }
        }
    }
}

Notice how we send the entry identifiers as pure strings, it will be important when we implement the second function.

You can find the changes introduced in this step here.

Wiring up the event handler

Once the handler is ready, it’s time to update the Function class.

Let’s start by installing two packages

$ dotnet add package Microsoft.Extensions.Options.ConfigurationExtensions
$ dotnet add package AWSSDK.Extensions.NETCore.Setup

We’ve already encountered the first package in the previous post. The second package is used to glue the configuration of AWS SDK clients with the ASP.NET Core dependency injection system.

Once the packages are installed, we can update the Function class as follows

public class Function : EventFunction<S3Event>
{
    protected override void Configure(IConfigurationBuilder builder)
    {
        builder.AddInMemoryCollection(new Dictionary<string, string>
        {
            ["Options:QueueUrl"] = "url to my SQS queue"
        });
    }

    protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
    {
        RegisterHandler<S3EventHandler>(services);

        services.AddAWSService<IAmazonS3>();

        services.AddAWSService<IAmazonSQS>();

        services.Configure<S3EventHandlerOptions>(Configuration.GetSection("Options"));
    }
}

The major difference is that I changed the base type from EventFunction<string> to EventFunction<S3Event>. Also, I don’t particolarly care about logging, so I just removed the empty method. The handler will still receive a valid ILogger but no logging provider is configured.

You can find the changes introduced in this step here.

Handling SQS messages

The second function will handle the SQS message by processing each entry. When the processing is complete, it will send out a notification using SNS that will be handled by the third function.

Like for S3 events, the requests coming from SQS might contain more than one message in its Records property. We could use the same approach we used in the previous function: iterate through the items of the Records property, deserialize the content of their Body property and finally process the item.

For a full example of a SQS event raised by AWS, check this article of the AWS documentation.

Since this is quite a repetitive job, so the AWSLambdaSharpTemplate comes with a template designed for handling requests coming from SQS. This template includes an implementation of IEventHandler<SQSEvent> that takes care of iterating through the collection of records and delegates the processing of each one to a IMessageHandler<TMessage> where TMessage is the type of a class that represents the incoming message.

We will use this template for scaffolding this function.

Using the SQS message handler template

Let’s start by creating the function by using the template lambda-template-sqs-event. This template is designed to include everything needed to process SQS message, so we don’t need to explictly add the Amazon.Lambda.SQSEvents package.

$ dotnet new lambda-template-sqs-event -o SqsEventHandlerFunction
$ cd SqsEventHandlerFunction

Next, let’s remove the TestMessage and TestMessageHandler classes and add a representation of our message and an empty implementation of IMessageHandler<Entry>.

public record Entry(Guid Id);

public class SqsMessageHandler : IMessageHandler<Entry>
{
    public Task HandleAsync(Entry? message, ILambdaContext context)
    {
        throw new NotImplementedException();
    }
}

Finally, let’s update the ConfigureService so that we register our own message handler and bind it to our specific message type.

public class Function : EventFunction<SQSEvent>
{
    protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
    {
        services.UseQueueMessageHandler<Entry, SqsMessageHandler>();
    }
}

Once done, we’re ready to start implementing the message handler.

You can find the changes introduced in this step here.

Implementing the message handler

The message handler will be responsible for processing each item. Once each item is processed, a notification will be sent using SNS.

To begin with, let’s add the NuGet package containing the SDK client for SNS.

$ dotnet add package AWSSDK.SimpleNotificationService

Then, we define a record that will represent the notification we send out and an option class that we will use to specify the ARN of the SNS topic where our notifications will be pushed to.

public record SqsMessageHandlerOptions
{
    public string TopicArn { get; init; } = default!;
}

public record NotificationMessage (Guid Id, bool IsSuccess);

Whilst a message handler is a totally viable place to contain your business logic, you might want to delegate parts of the processing to other services, like an SDK or a HTTP call. To keep things simple, I created a IItemProcessor interface with its dummy implementation.

public interface IItemProcessor
{
    Task ProcessItem(Guid itemId);
}

public class NullItemProcessor : IItemProcessor
{
    public Task ProcessItem(Guid itemId) => Task.CompletedTask;
}

Finally, let’s implement the message handler.

public class SqsMessageHandler : IMessageHandler<Entry>
{
    private readonly IItemProcessor _processor;
    private readonly IAmazonSimpleNotificationService _sns;
    private readonly SqsMessageHandlerOptions _options;
    private readonly ILogger<SqsMessageHandler> _logger;

    public SqsMessageHandler(
        IItemProcessor processor,
        IAmazonSimpleNotificationService sns,
        IOptions<SqsMessageHandlerOptions> options,
        ILogger<SqsMessageHandler> logger)
    {
        _processor = processor ?? throw new ArgumentNullException(nameof(processor));
        _sns = sns ?? throw new ArgumentNullException(nameof(sns));
        _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    public async Task HandleAsync(Entry? message, ILambdaContext context)
    {
        if (message is null) return;

        _logger.LogDebug("Processing item {Id}", message.Id);

        await _processor.ProcessItem(message.Id);

        var notification = JsonSerializer.Serialize(new NotificationMessage(message.Id, true));

        await _sns.PublishAsync(_options.TopicArn, notification);
    }
}

The HandleAsync method is responsible for all the work: it takes care of delegating the processing of the item to our backend service, creating a notification message and use the SNS SDK to publish its serialized version.

You can find the changes introduced in this step here.

Custom serializer

One last step is needed. By default, the library assumes that the body of the records of the request are a JSON representation of the type to be processed.

If we look at the body of the S3EventHandler in the previous function, you’ll notice that the body of the message we pass to SQS is a plain string representing the identifier of the entry to process.

await _sqs.SendMessageAsync(_options.QueueUrl, entry.Id.ToString());

So we need a way to override the deserialization process so that an instance of Entry can be created from a plain string.

This library offers an extensibility seam via the IMessageSerializer interface. You can use this interface in case the incoming message is not standard JSON. An example is that the payload of your message is encoded using XML.

public sealed class CustomSerializer : IMessageSerializer
{
    public TMessage? Deserialize<TMessage>(string input)
    {
        if (typeof(TMessage) != typeof(Entry))
        {
            throw new NotSupportedException();
        }

        if (!Guid.TryParse(input, out var id))
        {
            throw new FormatException("Format not valid");
        }

        return (TMessage)(object)new Entry(id);
    }
}

In the snippet above, you can see a very basic and rudimentary implementation that focuses only on the Entry type.

You can find the changes introduced in this step here.

Wiring up the message handler

Once the handler is ready, it’s time to update the Function class. Like for the S3 function, let’s install two packages that will help us registering AWS services and glue the Configuration system with the Options one.

$ dotnet add package AWSSDK.EXtensions.NETCore.Setup
$ dotnet add package Microsoft.Extensions.Options.ConfigurationExtensions

Once the packages are installed, we can update the Function class as follows.

public class Function : EventFunction<SQSEvent>
{
    protected override void Configure(IConfigurationBuilder builder)
    {
        builder.AddInMemoryCollection(new Dictionary<string, string>
        {
            ["Options:TopicArn"] = "ARN of my SNS topic"
        });
    }

    protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
    {
        services.UseQueueMessageHandler<Entry, SqsMessageHandler>().WithParallelExecution();

        services.AddAWSService<IAmazonSimpleNotificationService>();

        services.Configure<SqsMessageHandlerOptions>(Configuration.GetSection("Options"));

        services.UseCustomMessageSerializer<CustomSerializer>();

        services.AddSingleton<IItemProcessor, NullItemProcessor>();
    }
}

In the snippet above, I registered the SNS client from the AWS SDK, the custom message serializer and the IItemProcessor from the previous paragraphs.

You can find the changes introduced in this step here.

A special mention to the addition of WithParallelExecution() to the registration of the queue message handler. This extension instructs the function to process the records of the SQS event in parallel. It’s possible to specify the maximum degree of parallelism allowed but it defaults to the amount of cores available to the runtime.

Handling SNS notifications

Finally, the third function will be listening to the SNS topic and notify some stakeholders with the result of the processing.

Similarly to S3 and SQS events, the SNS ones can hold more than one notification at once.

For a full example of a SNS event raised by AWS, check this article of the AWS documentation.

Like for SQS, there is a template that specializes into handling SNS notifications.

Using the SNS notification handler template

Again, the first step is creating the function using the lambda-template-sns-event template.

$ dotnet new lambda-template-sns-event -o SnsEventHandlerFunction
$ cd SqsEventHandlerFunction

Then, we create a class/record representing the notification we will be receiving from SNS.

public record NotificationMessage (Guid Id, bool IsSuccess);

Next, we create a class implementing the interface INotificationHandler<TNotification>.

public class NotificationMessageHandler : INotificationHandler<NotificationMessage>
{
    public Task HandleAsync(NotificationMessage? notification, ILambdaContext context)
    {
        throw new NotImplementedException();
    }
}

Finally, we update the Function class so that the handler is registered in the ConfigureServices method.

public class Function : EventFunction<SNSEvent>
{
    protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
    {
        services.UseNotificationHandler<NotificationMessage, NotificationMessageHandler>();
    }
}

Like in the case of SQS messages, we can instruct the function to process the notifications in parallel by extending the registration with WithParallelExecution().

You can find the changes introduced in this step here.

Implementing the notification handler

The notification handler is extremely easy as its only responsibility is forwarding the incoming notification message to a backend service responsible for delivering the notification to all valid recipients. This backend service is abstracted behind a simple INotifier interface.

public interface INotifier
{
    Task NotifyRecipients(NotificationMessage message);
}

To keep the post simple, I added a simple implementation of the INotifier interface that immediately returns.

The handler by itself is quite simple. Checks that the incoming message is not null and forwards it to the notifier.

public class NotificationMessageHandler : INotificationHandler<NotificationMessage>
{
    private readonly INotifier _notifier;
    private readonly ILogger<NotificationMessageHandler> _logger;

    public NotificationMessageHandler(INotifier notifier, ILogger<NotificationMessageHandler> logger)
    {
        _notifier = notifier ?? throw new ArgumentNullException(nameof(notifier));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    public async Task HandleAsync(NotificationMessage? notification, ILambdaContext context)
    {
        if (notification is null) return;

        _logger.LogDebug("Notifying recipients of {Id}", notification.Id);

        await _notifier.NotifyRecipients(notification);
    }
}

You can find the changes introduced in this step here.

Wiring up the notification handler

Our last step is making sure that everything is properly registered in the Function class.

In our case, we need to register our INotifier, specifically a dummy implementation that I created.

public class Function : EventFunction<SNSEvent>
{
    protected override void ConfigureServices(IServiceCollection services, IExecutionEnvironment executionEnvironment)
    {
        services.UseNotificationHandler<NotificationMessage, NotificationMessageHandler>();

        services.AddSingleton<INotifier, DummyNotifier>();
    }
}

Now everything is ready to package and push this function to AWS.

You can find the changes introduced in this step here.

Recap

In this post we’ve seen how to leverage AWSLambdaSharpTemplate to create Lambda functions that respond to events that are raised by AWS.

Then we saw how we can use the templates contained in Kralizek.Lambda.Templates to create Lambda functions that specialize in responding to SQS and SNS events.

Here is a quick recap of the templates we used.

  • lambda-template-event-empty helped us create a function that processes incoming inputs without returning any output,
  • lambda-template-sqs-event can be used to create functions that handle SQS messages,
  • lambda-template-sns-event can be used to create functions that handle SNS messages