public async Task PlaceOrderAsync(PlaceOrderCommand command, CancellationToken ct)
{
await using var tx = await _db.Database.BeginTransactionAsync(ct);
var order = new Order(/* ... */);
_db.Orders.Add(order);
var evt = new OrderPlacedIntegrationEvent(order.Id, order.CustomerId, order.Total);
var outboxMessage = OutboxMessage.CreateFrom(evt, aggregateId: order.Id.ToString());
_db.OutboxMessages.Add(outboxMessage);
await _db.SaveChangesAsync(ct);
await tx.CommitAsync(ct);
}
public sealed class OutboxMessage
{
public Guid Id { get; private set; }
public string MessageType { get; private set; } = default!;
public string Payload { get; private set; } = default!;
public string? Metadata { get; private set; }
public DateTime CreatedAtUtc { get; private set; }
public byte Status { get; private set; }
private OutboxMessage() { } // EF
public static OutboxMessage CreateFrom(object @event, string? aggregateId = null)
{
return new OutboxMessage
{
Id = Guid.NewGuid(),
MessageType = @event.GetType().FullName!,
Payload = JsonSerializer.Serialize(@event, @event.GetType()),
Metadata = JsonSerializer.Serialize(new
{
AggregateId = aggregateId,
CorrelationId = Activity.Current?.TraceId.ToString(),
CausationId = Activity.Current?.SpanId.ToString()
}),
CreatedAtUtc = DateTime.UtcNow,
Status = 0 // Pending
};
}
}