Adding RabbitMQ
A common event that occurs in most applications is when a user signs up or registers for the app and the application needs to perform some action. This action could be sending a welcome email, updating a database, etc.
Lets take a look at setting up a message broker, and using the broker to send a message to a email service after a user registers foer the app.
Setting up AppHost
At this point you should be familiar with setting up an integration a aspire application. Per usual we’ll first add the app host package:
<PackageReference Include="Aspire.Hosting.RabbitMQ" Version="8.2.2" />Next create a RabbitMQ resource:
var messaging = builder .AddRabbitMQ("messaging") .WithManagementPlugin();You can see above we’ve added the management plugin so that we can view the RabbitMQ dashboard.

Events project
I’ve setup an additonal shared project to hold
different event types.
We have a MessageBody class that holds the
data and event type for the message.
public class MessageBody<T>{ public string EventType { get; set; } = null!; public T Data { get; set; } = default!; public DateTime DateTime { get; set; }}And a UserSignedUpEvent for the generic value:
public class UserSignedUpEvent{ public required string Email { get; set; }}We can also create a static class to hold the string names of different events. For example:
public static class EventTypes{ public const string UserRegistered = "user.registered"; public const string UserPasswordReset = "user.password_reset";}Adding to the register handler
In Auth.Api we need to add a reference
for the integration:
<PackageReference Include="Aspire.RabbitMQ.Client" Version="8.2.2" />Next we’ll add the Rabbit MQ connection the DI services:
builder.AddRabbitMQClient("messaging");internal static async Task<IResult> RegisterUserAsync( HttpContext httpContext, [FromBody] RegisterRequest request, [FromServices] UserManager<AppUser> userManager, [FromServices] IJwtTokenService jwtTokenService, [FromServices] IJwtCookieService jwtCookieService, [FromServices] RabbitMQ.Client.IConnection connection){ AppUser? existingUser = await userManager.FindByEmailAsync(request.Email); if (existingUser is not null) { return Results.Conflict("User already exists"); }
AppUser user = new() { Email = request.Email, UserName = request.UserName }; IdentityResult result = await userManager.CreateAsync(user, request.Password);
if (!result.Succeeded) { return Results.BadRequest(result.Errors.ToList().Select(x => x.Description)); }
var channel = connection.CreateModel();
// declare a queue channel.QueueDeclare( queue: QueueNames.UserEvents, durable: false, // if true, the queue will survive broker restarts exclusive: false, // if true, connection is exclusive to the queue autoDelete: false, // if true, the queue will be deleted when the number of consumers drops to zero arguments: null ); var body = new MessageBody<UserSignedUpEvent> { EventType = EventTypes.UserRegistered, Data = new UserSignedUpEvent { Email = user.Email }, DateTime = DateTime.UtcNow }; var encodedBody = JsonSerializer.SerializeToUtf8Bytes(body); channel.BasicPublish( exchange: ExchangeNames.DefaultExchange, mandatory: false, routingKey: QueueNames.UserEvents, basicProperties: null, body: encodedBody );
string token = jwtTokenService.GenerateJwtTokenAsync(user); jwtCookieService.SetJwtCookie(httpContext, token);
var userDto = new UserDto { UserId = user.Id, UserName = user.UserName, Email = user.Email, };
return Results.Ok(userDto);}Next we’ll need to create a service that responds to this event.
Email Service
Create a new service called User.Email,
and add the RabbitMQ client package:
<PackageReference Include="Aspire.RabbitMQ.Client" Version="8.2.2" />Next we’ll add the RabbitMQ connection to the Program.cs
var builder = Host.CreateApplicationBuilder(args);
builder.AddServiceDefaults();
builder.AddRabbitMQClient("messaging");builder.Services.AddHostedService<RegisterProcessingJob>();
var host = builder.Build();host.Run();Finally we can create the job that responds to the event:
using System.Text;using System.Text.Json;using _100Days.Events;using _100Days.Events.Users;using Microsoft.Extensions.DependencyInjection;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using RabbitMQ.Client;using RabbitMQ.Client.Events;
namespace _100Days.RabbitConsumer;
public class RegisterProcessingJob : BackgroundService{ private readonly ILogger<RegisterProcessingJob> _logger; private readonly IServiceProvider _serviceProvider; private IConnection? _messagingConnection; private IModel? _messageChannel; private EventingBasicConsumer? _consumer;
public RegisterProcessingJob( ILogger<RegisterProcessingJob> logger, IServiceProvider serviceProvider ) { _logger = logger; _serviceProvider = serviceProvider; }
protected override Task ExecuteAsync(CancellationToken stoppingToken) { string queueName = "user-events"; _messagingConnection = _serviceProvider.GetRequiredService<IConnection>();
_messageChannel = _messagingConnection.CreateModel(); _messageChannel.QueueDeclare( queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null ); _consumer = new EventingBasicConsumer(_messageChannel); _consumer.Received += ProcessMessageAsync;
_messageChannel.BasicConsume(queue: queueName, autoAck: true, consumer: _consumer);
return Task.CompletedTask; }
public override async Task StopAsync(CancellationToken cancellationToken) { await base.StopAsync(cancellationToken); _consumer!.Received -= ProcessMessageAsync; _messageChannel?.Dispose(); }
private void ProcessMessageAsync(object? sender, BasicDeliverEventArgs e) { try { var body = Encoding.UTF8.GetString(e.Body.ToArray()); MessageBody<UserSignedUpEvent>? messageBody = JsonSerializer.Deserialize<MessageBody<UserSignedUpEvent>>(body); if (messageBody is null) { _logger.LogWarning("Message body is null"); return; }
_logger.LogInformation("User signed up"); _logger.LogInformation("Sending email beep boop"); } catch (Exception exception) { _logger.LogError(exception, "Error processing message"); } }}Although we’re not actually sending an email in this example we can see how to communicate with the message queue in Aspire.