From 26bbd9946a0090515b8ef9eb91e3740934abff68 Mon Sep 17 00:00:00 2001 From: Francis Pion Date: Fri, 27 Dec 2024 20:38:04 -0500 Subject: [PATCH] Event handlers are now idempotent. --- CHANGELOG.md | 7 +- .../Entities/UserEntity.cs | 4 + .../Handlers/ApiKeyEvents.cs | 102 ++++-- .../Handlers/EventHandled.cs | 6 + .../Handlers/EventNotHandled.cs | 26 ++ .../Handlers/OneTimePasswordEvents.cs | 76 +++- .../Handlers/RoleEvents.cs | 58 ++- .../Handlers/SessionEvents.cs | 76 +++- .../Handlers/UserEvents.cs | 334 +++++++++++++----- 9 files changed, 520 insertions(+), 169 deletions(-) create mode 100644 lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventHandled.cs create mode 100644 lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventNotHandled.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 80d3897..10d9678 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -Nothing yet. +### Fixed + +- Event handlers are now idempotent. ## [3.0.4] - 2024-12-27 @@ -67,7 +69,8 @@ Nothing yet. - Relational storage (PostgreSQL and Microsoft SQL Server) for Identity entities. - Unit and Integration tests. -[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.4...HEAD +[unreleased]: https://github.com/Logitar/Identity/compare/v3.0.5...HEAD +[3.0.5]: https://github.com/Logitar/Identity/compare/v3.0.4...v3.0.5 [3.0.4]: https://github.com/Logitar/Identity/compare/v3.0.3...v3.0.4 [3.0.3]: https://github.com/Logitar/Identity/compare/v3.0.2...v3.0.3 [3.0.2]: https://github.com/Logitar/Identity/compare/v3.0.1...v3.0.2 diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Entities/UserEntity.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Entities/UserEntity.cs index e94f3c2..6a4134a 100644 --- a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Entities/UserEntity.cs +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Entities/UserEntity.cs @@ -198,6 +198,8 @@ public void Enable(UserEnabled @event) public void RemoveCustomIdentifier(UserIdentifierRemoved @event) { + Update(@event); + UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value); if (identifier != null) { @@ -241,6 +243,8 @@ public void SetAddress(UserAddressChanged @event) public void SetCustomIdentifier(UserIdentifierChanged @event) { + Update(@event); + UserIdentifierEntity? identifier = Identifiers.SingleOrDefault(x => x.Key == @event.Key.Value); if (identifier == null) { diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/ApiKeyEvents.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/ApiKeyEvents.cs index f4227ee..cf710c5 100644 --- a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/ApiKeyEvents.cs +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/ApiKeyEvents.cs @@ -14,28 +14,39 @@ public sealed class ApiKeyEvents : INotificationHandler, { private readonly IdentityContext _context; private readonly ICustomAttributeService _customAttributes; + private readonly IMediator _mediator; - public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes) + public ApiKeyEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator) { _context = context; _customAttributes = customAttributes; + _mediator = mediator; } public async Task Handle(ApiKeyAuthenticated @event, CancellationToken cancellationToken) { - ApiKeyEntity apiKey = await _context.ApiKeys - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found."); + ApiKeyEntity? apiKey = await _context.ApiKeys + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - apiKey.Authenticate(@event); + if (apiKey == null || apiKey.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken); + } + else + { + apiKey.Authenticate(@event); - await _context.SaveChangesAsync(cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationToken) { ApiKeyEntity? apiKey = await _context.ApiKeys.AsNoTracking() .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + if (apiKey == null) { apiKey = new(@event); @@ -44,6 +55,12 @@ public async Task Handle(ApiKeyCreated @event, CancellationToken cancellationTok await SaveActorAsync(apiKey, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } + else + { + await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken); } } @@ -51,54 +68,85 @@ public async Task Handle(ApiKeyDeleted @event, CancellationToken cancellationTok { ApiKeyEntity? apiKey = await _context.ApiKeys .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - if (apiKey != null) + + if (apiKey == null) + { + await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken); + } + else { _context.ApiKeys.Remove(apiKey); await DeleteActorAsync(apiKey, cancellationToken); await _customAttributes.RemoveAsync(EntityType.ApiKey, apiKey.ApiKeyId, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); } } public async Task Handle(ApiKeyRoleAdded @event, CancellationToken cancellationToken) { - ApiKeyEntity apiKey = await _context.ApiKeys - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found."); + ApiKeyEntity? apiKey = await _context.ApiKeys + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (apiKey == null || apiKey.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken); + } + else + { + RoleEntity role = await _context.Roles + .SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken) + ?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found."); - RoleEntity role = await _context.Roles - .SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found."); + apiKey.AddRole(role, @event); - apiKey.AddRole(role, @event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(ApiKeyRoleRemoved @event, CancellationToken cancellationToken) { - ApiKeyEntity apiKey = await _context.ApiKeys + ApiKeyEntity? apiKey = await _context.ApiKeys .Include(x => x.Roles) - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found."); + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - apiKey.RemoveRole(@event); + if (apiKey == null || apiKey.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken); + } + else + { + apiKey.RemoveRole(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(ApiKeyUpdated @event, CancellationToken cancellationToken) { - ApiKeyEntity apiKey = await _context.ApiKeys - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The API key entity 'StreamId={@event.StreamId}' could not be found."); + ApiKeyEntity? apiKey = await _context.ApiKeys + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (apiKey == null || apiKey.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, apiKey), cancellationToken); + } + else + { + apiKey.Update(@event); - apiKey.Update(@event); + await SaveActorAsync(apiKey, cancellationToken); + await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); - await SaveActorAsync(apiKey, cancellationToken); - await _customAttributes.UpdateAsync(EntityType.ApiKey, apiKey.ApiKeyId, @event.CustomAttributes, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } private async Task DeleteActorAsync(ApiKeyEntity apiKey, CancellationToken cancellationToken) diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventHandled.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventHandled.cs new file mode 100644 index 0000000..0e86042 --- /dev/null +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventHandled.cs @@ -0,0 +1,6 @@ +using Logitar.EventSourcing; +using MediatR; + +namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers; + +public record EventHandled(DomainEvent Event) : INotification; diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventNotHandled.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventNotHandled.cs new file mode 100644 index 0000000..d1595c9 --- /dev/null +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/EventNotHandled.cs @@ -0,0 +1,26 @@ +using Logitar.EventSourcing; +using Logitar.Identity.EntityFrameworkCore.Relational.Entities; +using MediatR; + +namespace Logitar.Identity.EntityFrameworkCore.Relational.Handlers; + +public record EventNotHandled : INotification +{ + public long ExpectedVersion { get; } + public long ActualVersion { get; } + + public EventNotHandled(long expectedVersion, long actualVersion) + { + ArgumentOutOfRangeException.ThrowIfNegative(expectedVersion); + ArgumentOutOfRangeException.ThrowIfNegative(actualVersion); + + ExpectedVersion = expectedVersion; + ActualVersion = actualVersion; + } + + public EventNotHandled(DomainEvent @event, AggregateEntity? aggregate) + { + ExpectedVersion = @event.Version - 1; + ActualVersion = aggregate?.Version ?? 0; + } +} diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/OneTimePasswordEvents.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/OneTimePasswordEvents.cs index 1287624..ac127bc 100644 --- a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/OneTimePasswordEvents.cs +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/OneTimePasswordEvents.cs @@ -13,17 +13,20 @@ public sealed class OneTimePasswordEvents : INotificationHandler x.StreamId == @event.StreamId.Value, cancellationToken); + if (oneTimePassword == null) { oneTimePassword = new(@event); @@ -31,6 +34,12 @@ public async Task Handle(OneTimePasswordCreated @event, CancellationToken cancel _context.OneTimePasswords.Add(oneTimePassword); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } + else + { + await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken); } } @@ -38,46 +47,77 @@ public async Task Handle(OneTimePasswordDeleted @event, CancellationToken cancel { OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - if (oneTimePassword != null) + + if (oneTimePassword == null) + { + await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken); + } + else { _context.OneTimePasswords.Remove(oneTimePassword); await _customAttributes.RemoveAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); } } public async Task Handle(OneTimePasswordUpdated @event, CancellationToken cancellationToken) { - OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found."); + OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken); + } + else + { + oneTimePassword.Update(@event); - oneTimePassword.Update(@event); + await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); - await _customAttributes.UpdateAsync(EntityType.OneTimePassword, oneTimePassword.OneTimePasswordId, @event.CustomAttributes, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(OneTimePasswordValidationFailed @event, CancellationToken cancellationToken) { - OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found."); + OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken); + } + else + { + oneTimePassword.Fail(@event); - oneTimePassword.Fail(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(OneTimePasswordValidationSucceeded @event, CancellationToken cancellationToken) { - OneTimePasswordEntity oneTimePassword = await _context.OneTimePasswords - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The One-Time Password (OTP) entity 'StreamId={@event.StreamId}' could not be found."); + OneTimePasswordEntity? oneTimePassword = await _context.OneTimePasswords + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (oneTimePassword == null || oneTimePassword.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, oneTimePassword), cancellationToken); + } + else + { + oneTimePassword.Succeed(@event); - oneTimePassword.Succeed(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } } diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/RoleEvents.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/RoleEvents.cs index 8ae045a..af8db52 100644 --- a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/RoleEvents.cs +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/RoleEvents.cs @@ -12,17 +12,20 @@ public sealed class RoleEvents : INotificationHandler, { private readonly IdentityContext _context; private readonly ICustomAttributeService _customAttributes; + private readonly IMediator _mediator; - public RoleEvents(IdentityContext context, ICustomAttributeService customAttributes) + public RoleEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator) { _context = context; _customAttributes = customAttributes; + _mediator = mediator; } public async Task Handle(RoleCreated @event, CancellationToken cancellationToken) { RoleEntity? role = await _context.Roles.AsNoTracking() .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + if (role == null) { role = new(@event); @@ -30,6 +33,12 @@ public async Task Handle(RoleCreated @event, CancellationToken cancellationToken _context.Roles.Add(role); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } + else + { + await _mediator.Publish(new EventNotHandled(@event, role), cancellationToken); } } @@ -37,35 +46,58 @@ public async Task Handle(RoleDeleted @event, CancellationToken cancellationToken { RoleEntity? role = await _context.Roles .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - if (role != null) + + if (role == null) + { + await _mediator.Publish(new EventNotHandled(@event, role), cancellationToken); + } + else { _context.Roles.Remove(role); await _customAttributes.RemoveAsync(EntityType.Role, role.RoleId, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); } } public async Task Handle(RoleUniqueNameChanged @event, CancellationToken cancellationToken) { - RoleEntity role = await _context.Roles - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The role entity 'StreamId={@event.StreamId}' could not be found."); + RoleEntity? role = await _context.Roles + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - role.SetUniqueName(@event); + if (role == null || role.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, role), cancellationToken); + } + else + { + role.SetUniqueName(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(RoleUpdated @event, CancellationToken cancellationToken) { - RoleEntity role = await _context.Roles - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The role entity 'StreamId={@event.StreamId}' could not be found."); + RoleEntity? role = await _context.Roles + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - role.Update(@event); + if (role == null || role.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, role), cancellationToken); + } + else + { + role.Update(@event); - await _customAttributes.UpdateAsync(EntityType.Role, role.RoleId, @event.CustomAttributes, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _customAttributes.UpdateAsync(EntityType.Role, role.RoleId, @event.CustomAttributes, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } } diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/SessionEvents.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/SessionEvents.cs index f959d48..9bbf478 100644 --- a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/SessionEvents.cs +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/SessionEvents.cs @@ -13,17 +13,20 @@ public sealed class SessionEvents : INotificationHandler, { private readonly IdentityContext _context; private readonly ICustomAttributeService _customAttributes; + private readonly IMediator _mediator; - public SessionEvents(IdentityContext context, ICustomAttributeService customAttributes) + public SessionEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator) { _context = context; _customAttributes = customAttributes; + _mediator = mediator; } public async Task Handle(SessionCreated @event, CancellationToken cancellationToken) { SessionEntity? session = await _context.Sessions.AsNoTracking() .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + if (session == null) { UserEntity user = await _context.Users @@ -34,6 +37,12 @@ public async Task Handle(SessionCreated @event, CancellationToken cancellationTo user.Sessions.Add(session); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } + else + { + await _mediator.Publish(new EventNotHandled(@event, session), cancellationToken); } } @@ -41,46 +50,77 @@ public async Task Handle(SessionDeleted @event, CancellationToken cancellationTo { SessionEntity? session = await _context.Sessions .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - if (session != null) + + if (session == null) + { + await _mediator.Publish(new EventNotHandled(@event, session), cancellationToken); + } + else { _context.Sessions.Remove(session); await _customAttributes.RemoveAsync(EntityType.Session, session.SessionId, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); } } public async Task Handle(SessionRenewed @event, CancellationToken cancellationToken) { - SessionEntity session = await _context.Sessions - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The session entity 'StreamId={@event.StreamId}' could not be found."); + SessionEntity? session = await _context.Sessions + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (session == null || session.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, session), cancellationToken); + } + else + { + session.Renew(@event); - session.Renew(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(SessionSignedOut @event, CancellationToken cancellationToken) { - SessionEntity session = await _context.Sessions - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The session entity 'StreamId={@event.StreamId}' could not be found."); + SessionEntity? session = await _context.Sessions + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (session == null || session.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, session), cancellationToken); + } + else + { + session.SignOut(@event); - session.SignOut(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(SessionUpdated @event, CancellationToken cancellationToken) { - SessionEntity session = await _context.Sessions - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The session entity 'StreamId={@event.StreamId}' could not be found."); + SessionEntity? session = await _context.Sessions + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (session == null || session.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, session), cancellationToken); + } + else + { + session.Update(@event); - session.Update(@event); + await _customAttributes.UpdateAsync(EntityType.Session, session.SessionId, @event.CustomAttributes, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); - await _customAttributes.UpdateAsync(EntityType.Session, session.SessionId, @event.CustomAttributes, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } } diff --git a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/UserEvents.cs b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/UserEvents.cs index 5187747..bcd58f2 100644 --- a/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/UserEvents.cs +++ b/lib/Logitar.Identity.EntityFrameworkCore.Relational/Handlers/UserEvents.cs @@ -27,39 +27,58 @@ public sealed class UserEvents : INotificationHandler, { private readonly IdentityContext _context; private readonly ICustomAttributeService _customAttributes; + private readonly IMediator _mediator; - public UserEvents(IdentityContext context, ICustomAttributeService customAttributes) + public UserEvents(IdentityContext context, ICustomAttributeService customAttributes, IMediator mediator) { _context = context; _customAttributes = customAttributes; + _mediator = mediator; } public async Task Handle(UserAddressChanged @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetAddress(@event); - user.SetAddress(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserAuthenticated @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.Authenticate(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.Authenticate(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserCreated @event, CancellationToken cancellationToken) { UserEntity? user = await _context.Users.AsNoTracking() .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + if (user == null) { user = new(@event); @@ -68,6 +87,12 @@ public async Task Handle(UserCreated @event, CancellationToken cancellationToken await SaveActorAsync(user, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } + else + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); } } @@ -75,190 +100,317 @@ public async Task Handle(UserDeleted @event, CancellationToken cancellationToken { UserEntity? user = await _context.Users .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - if (user != null) + + if (user == null) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else { _context.Users.Remove(user); await DeleteActorAsync(user, cancellationToken); await _customAttributes.RemoveAsync(EntityType.User, user.UserId, cancellationToken); await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); } } public async Task Handle(UserDisabled @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.Disable(@event); - user.Disable(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserEmailChanged @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetEmail(@event); - user.SetEmail(@event); + await SaveActorAsync(user, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); - await SaveActorAsync(user, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserEnabled @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.Enable(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.Enable(@event); - await _context.SaveChangesAsync(cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserIdentifierChanged @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users + UserEntity? user = await _context.Users .Include(x => x.Identifiers) - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.SetCustomIdentifier(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetCustomIdentifier(@event); - await _context.SaveChangesAsync(cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserIdentifierRemoved @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users + UserEntity? user = await _context.Users .Include(x => x.Identifiers) - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.RemoveCustomIdentifier(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.RemoveCustomIdentifier(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserPasswordChanged @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.SetPassword(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetPassword(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserPasswordRemoved @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.RemovePassword(@event); - user.RemovePassword(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserPasswordReset @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.SetPassword(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetPassword(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserPasswordUpdated @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.SetPassword(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetPassword(@event); - await _context.SaveChangesAsync(cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserPhoneChanged @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); + + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetPhone(@event); - user.SetPhone(@event); + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserRoleAdded @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); RoleEntity role = await _context.Roles .SingleOrDefaultAsync(x => x.StreamId == @event.RoleId.Value, cancellationToken) ?? throw new InvalidOperationException($"The role entity 'StreamId={@event.RoleId}' could not be found."); - user.AddRole(role, @event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.AddRole(role, @event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserRoleRemoved @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users + UserEntity? user = await _context.Users .Include(x => x.Roles) - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.RemoveRole(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.RemoveRole(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserSignedIn @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.SignIn(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SignIn(@event); + + await _context.SaveChangesAsync(cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserUniqueNameChanged @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.SetUniqueName(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.SetUniqueName(@event); - await SaveActorAsync(user, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await SaveActorAsync(user, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } public async Task Handle(UserUpdated @event, CancellationToken cancellationToken) { - UserEntity user = await _context.Users - .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken) - ?? throw new InvalidOperationException($"The user entity 'StreamId={@event.StreamId}' could not be found."); + UserEntity? user = await _context.Users + .SingleOrDefaultAsync(x => x.StreamId == @event.StreamId.Value, cancellationToken); - user.Update(@event); + if (user == null || user.Version != (@event.Version - 1)) + { + await _mediator.Publish(new EventNotHandled(@event, user), cancellationToken); + } + else + { + user.Update(@event); - await SaveActorAsync(user, cancellationToken); - await _customAttributes.UpdateAsync(EntityType.User, user.UserId, @event.CustomAttributes, cancellationToken); - await _context.SaveChangesAsync(cancellationToken); + await SaveActorAsync(user, cancellationToken); + await _customAttributes.UpdateAsync(EntityType.User, user.UserId, @event.CustomAttributes, cancellationToken); + await _context.SaveChangesAsync(cancellationToken); + + await _mediator.Publish(new EventHandled(@event), cancellationToken); + } } private async Task DeleteActorAsync(UserEntity user, CancellationToken cancellationToken)