Skip to content
This repository was archived by the owner on Jun 9, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

Nothing yet.
### Fixed

- Event handlers are now idempotent.

## [3.0.4] - 2024-12-27

Expand Down Expand Up @@ -67,7 +69,8 @@ Nothing yet.
- Relational storage (PostgreSQL and Microsoft SQL Server) for Identity entities.
- Unit and Integration tests.

[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.4...HEAD
[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.5...HEAD
[3.0.5]: https://github.com/Logitar/Identity/compare/v3.0.4...v3.0.5
[3.0.4]: https://github.com/Logitar/Identity/compare/v3.0.3...v3.0.4
[3.0.3]: https://github.com/Logitar/Identity/compare/v3.0.2...v3.0.3
[3.0.2]: https://github.com/Logitar/Identity/compare/v3.0.1...v3.0.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public void Enable(UserEnabled @event)

public void RemoveCustomIdentifier(UserIdentifierRemoved @event)
{
Update(@event);

UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value);
if (identifier != null)
{
Expand Down Expand Up @@ -241,6 +243,8 @@ public void SetAddress(UserAddressChanged @event)

public void SetCustomIdentifier(UserIdentifierChanged @event)
{
Update(@event);

UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value);
if (identifier == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,39 @@ public sealed class ApiKeyEvents : INotificationHandler<ApiKeyAuthenticated>,
{
private readonly IdentityContext _context;
private readonly ICustomAttributeService _customAttributes;
private readonly IMediator _mediator;

public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes)
public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator)
{
_context = context;
_customAttributes = customAttributes;
_mediator = mediator;
}

public async Task Handle(ApiKeyAuthenticated @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

apiKey.Authenticate(@event);
if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
apiKey.Authenticate(@event);

await _context.SaveChangesAsync(cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationToken)
{
ApiKeyEntity? apiKey = await _context.ApiKeys.AsNoTracking()
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (apiKey == null)
{
apiKey = new(@event);
Expand All @@ -44,61 +55,98 @@ public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationTok

await SaveActorAsync(apiKey, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
else
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
}

public async Task Handle(ApiKeyDeleted @event, CancellationToken cancellationToken)
{
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
if (apiKey != null)

if (apiKey == null)
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
_context.ApiKeys.Remove(apiKey);

await DeleteActorAsync(apiKey, cancellationToken);
await _customAttributes.RemoveAsync(EntityType.ApiKey, apiKey.ApiKeyId, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyRoleAdded @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
RoleEntity role = await _context.Roles
.SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken)
?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found.");

RoleEntity role = await _context.Roles
.SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken)
?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found.");
apiKey.AddRole(role, @event);

apiKey.AddRole(role, @event);
await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyRoleRemoved @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
ApiKeyEntity? apiKey = await _context.ApiKeys
.Include(x => x.Roles)
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

apiKey.RemoveRole(@event);
if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
apiKey.RemoveRole(@event);

await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(ApiKeyUpdated @event, CancellationToken cancellationToken)
{
ApiKeyEntity apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found.");
ApiKeyEntity? apiKey = await _context.ApiKeys
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (apiKey == null || apiKey.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken);
}
else
{
apiKey.Update(@event);

apiKey.Update(@event);
await SaveActorAsync(apiKey, cancellationToken);
await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await SaveActorAsync(apiKey, cancellationToken);
await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

private async Task DeleteActorAsync(ApiKeyEntity apiKey, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
using Logitar.EventSourcing;
using MediatR;

namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers;

public record EventHandled(DomainEvent Event) : INotification;
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using Logitar.EventSourcing;
using Logitar.Identity.EntityFrameworkCore.Relational.Entities;
using MediatR;

namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers;

public record EventNotHandled : INotification
{
public long ExpectedVersion { get; }
public long ActualVersion { get; }

public EventNotHandled(long expectedVersion, long actualVersion)
{
ArgumentOutOfRangeException.ThrowIfNegative(expectedVersion);
ArgumentOutOfRangeException.ThrowIfNegative(actualVersion);

ExpectedVersion = expectedVersion;
ActualVersion = actualVersion;
}

public EventNotHandled(DomainEvent @event, AggregateEntity? aggregate)
{
ExpectedVersion = @event.Version - 1;
ActualVersion = aggregate?.Version ?? 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,111 @@ public sealed class OneTimePasswordEvents : INotificationHandler<OneTimePassword
{
private readonly IdentityContext _context;
private readonly ICustomAttributeService _customAttributes;
private readonly IMediator _mediator;

public OneTimePasswordEvents(IdentityContext context, ICustomAttributeService customAttributes)
public OneTimePasswordEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator)
{
_context = context;
_customAttributes = customAttributes;
_mediator = mediator;
}

public async Task Handle(OneTimePasswordCreated @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords.AsNoTracking()
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null)
{
oneTimePassword = new(@event);

_context.OneTimePasswords.Add(oneTimePassword);

await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
else
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
}

public async Task Handle(OneTimePasswordDeleted @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);
if (oneTimePassword != null)

if (oneTimePassword == null)
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
_context.OneTimePasswords.Remove(oneTimePassword);

await _customAttributes.RemoveAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(OneTimePasswordUpdated @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
oneTimePassword.Update(@event);

oneTimePassword.Update(@event);
await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);

await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(OneTimePasswordValidationFailed @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
oneTimePassword.Fail(@event);

oneTimePassword.Fail(@event);
await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}

public async Task Handle(OneTimePasswordValidationSucceeded @event, CancellationToken cancellationToken)
{
OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken)
?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found.");
OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords
.SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken);

if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1))
{
await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken);
}
else
{
oneTimePassword.Succeed(@event);

oneTimePassword.Succeed(@event);
await _context.SaveChangesAsync(cancellationToken);

await _context.SaveChangesAsync(cancellationToken);
await _mediator.Publish(new EventHandled(@event), cancellationToken);
}
}
}
Loading
Loading