Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Documents.Client;
using Microwave.Domain;

namespace Microwave.Eventstores.Persistence.CosmosDb
{
public class CosmosDb : ICosmosDb
{

private readonly IMicrowaveConfiguration _configuration;

public CosmosDb(IMicrowaveConfiguration configuration)
{
_configuration = configuration;
}

public DocumentClient GetCosmosDbClient()
{
return new DocumentClient(new Uri(_configuration.DatabaseConfiguration.ConnectionString),
_configuration.DatabaseConfiguration.PrimaryKey);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microwave.Domain.EventSourcing;
using Microwave.Domain.Identities;
using Microwave.Domain.Results;
using Microwave.EventStores;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microwave.Eventstores.Persistence.CosmosDb
{
public class CosmosDbClient : ICosmosDbClient
{
private readonly DocumentClient _client;
private IEnumerable<Type> _domainEventTypes;
private const string DatabaseId = "Eventstore";
private const string CollectionId = "DomainEvents";

public CosmosDbClient(ICosmosDb cosmosDb, IEnumerable<Assembly> assemblies)
{
_client = cosmosDb.GetCosmosDbClient();
var type = typeof(IDomainEvent);
_domainEventTypes = assemblies
.SelectMany(s => s.GetTypes())
.Where(p => type.IsAssignableFrom(p));

}

public async Task InitializeCosmosDbAsync()
{
var database = await _client.CreateDatabaseIfNotExistsAsync(new Database { Id = DatabaseId });
var collection = await _client.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(DatabaseId),
new DocumentCollection { Id = CollectionId });
if (database == null || collection == null)
{
throw new ArgumentException("Could not create Database or Collection with given CosmosDb Configuration Parameters!");
}
}


public async Task CreateDomainEventAsync(IDomainEvent domainEvent)
{
var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId);
await _client.CreateDocumentAsync(uri, domainEvent);

}


public async Task<IEnumerable<IDomainEvent>> GetDomainEventsAsync(Identity identity)
{
var query = _client.CreateDocumentQuery<DomainEventWrapper>(
UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId),
new FeedOptions { MaxItemCount = -1 })
.Where(e => e.DomainEvent.EntityId == identity)
.AsDocumentQuery();

var wrappedEvents = new List<JObject>();
while (query.HasMoreResults)
{
wrappedEvents.AddRange(await query.ExecuteNextAsync<JObject>());
}

var result = wrappedEvents.Select(e => JsonConvert.DeserializeObject(e.GetValue("DomainEvent").ToString(), _domainEventTypes.Single(x => x.Name == e.GetValue("DomainEventType").ToString()))).ToList();
return new List<IDomainEvent>();
}


public async Task<Result<IEnumerable<DomainEventWrapper>>> GetDomainEventsAsync(DateTimeOffset tickSince)
{
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId);
var query = _client.CreateDocumentQuery<DomainEventWrapper>(uri, queryOptions)
.Where(e => e.Created > tickSince);
return Result<IEnumerable<DomainEventWrapper>>.Ok(query.ToList());
}

public async Task<Document> CreateItemAsync(DomainEventWrapper domainEvent)
{
return await _client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId), domainEvent);
}

public async Task<Result<IEnumerable<DomainEventWrapper>>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince)
{
FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
var uri = UriFactory.CreateDocumentCollectionUri(DatabaseId, CollectionId);
var query = _client.CreateDocumentQuery<DomainEventWrapper>(uri, queryOptions)
.Where(e => e.DomainEventType == eventType);
return Result<IEnumerable<DomainEventWrapper>>.Ok(query.ToList());
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Microwave.Domain.EventSourcing;
using Microwave.Domain.Identities;
using Microwave.Domain.Results;
using Microwave.EventStores;
using Microwave.EventStores.Ports;

namespace Microwave.Eventstores.Persistence.CosmosDb
{
public class CosmosDbEventRepository : IEventRepository
{
private readonly ICosmosDbClient _cosmosDbClient;


public CosmosDbEventRepository(ICosmosDbClient cosmosDbClient)
{
_cosmosDbClient = cosmosDbClient;
}

public async Task<Result<IEnumerable<DomainEventWrapper>>> LoadEventsByEntity(Identity entityId, long @from = 0)
{
throw new NotImplementedException();
//var uri = CreateUriForCosmosDb(entityId);
//var domainEvents = (await _client.ReadDocumentAsync<List<DomainEventWrapper>>(uri)).Document;
//return new EventStoreResult<IEnumerable<DomainEventWrapper>>(domainEvents, domainEvents.Max(e => e.Version));
}

public async Task<Result> AppendAsync(IEnumerable<IDomainEvent> domainEvents, long currentEntityVersion)
{
foreach (var domainEvent in domainEvents)
{

await _cosmosDbClient.CreateDomainEventAsync(domainEvent);
}

return Result.Ok();
}

public async Task<Result<IEnumerable<DomainEventWrapper>>> LoadEvents(DateTimeOffset tickSince = default(DateTimeOffset))
{
var result = await _cosmosDbClient.GetDomainEventsAsync(tickSince);
if (result.Value.Any())
{
return Result<IEnumerable<DomainEventWrapper>>.Ok(result.Value);
}
else
{
return Result<IEnumerable<DomainEventWrapper>>.NotFound(null);
}
}

public async Task<Result<IEnumerable<DomainEventWrapper>>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince = default(DateTimeOffset))
{
var result = _cosmosDbClient.LoadEventsByTypeAsync(eventType, tickSince);
return Result<IEnumerable<DomainEventWrapper>>.Ok(result.Result.Value);
}

public async Task<Result<DateTimeOffset>> GetLastEventOccuredOn(string domainEventType)
{
throw new NotImplementedException();
//FeedOptions queryOptions = new FeedOptions { MaxItemCount = -1 };
//var uri = UriFactory.CreateDocumentCollectionUri(DatabaseName, CollectionId);
//var query = _client.CreateDocumentQuery<DomainEventWrapper>(uri, queryOptions).ToList();
//var latestEventTime = query.Max(e => e.Created);

//return Result<DateTimeOffset>.Ok(latestEventTime);
}

private Uri CreateUriForCosmosDb(Identity identity)
{
//return UriFactory.CreateDocumentUri(DatabaseName, CollectionId, identity.Id);
return null;
}
}
}
10 changes: 10 additions & 0 deletions CosmosDb/Microwave.Eventstores.Persistence.CosmosDb/ICosmosDb.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Threading.Tasks;
using Microsoft.Azure.Documents.Client;

namespace Microwave.Eventstores.Persistence.CosmosDb
{
public interface ICosmosDb
{
DocumentClient GetCosmosDbClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microwave.Domain.EventSourcing;
using Microwave.Domain.Identities;
using Microwave.Domain.Results;
using Microwave.EventStores;

namespace Microwave.Eventstores.Persistence.CosmosDb
{
public interface ICosmosDbClient
{
Task CreateDomainEventAsync(IDomainEvent domainEvent);
Task<Result<IEnumerable<DomainEventWrapper>>> GetDomainEventsAsync(DateTimeOffset tickSince);
Task<Result<IEnumerable<DomainEventWrapper>>> LoadEventsByTypeAsync(string eventType, DateTimeOffset tickSince);
Task<Document> CreateItemAsync(DomainEventWrapper domainEvent);
Task<IEnumerable<IDomainEvent>> GetDomainEventsAsync(Identity identity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.DocumentDB.Core" Version="2.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Microwave.EventStores\Microwave.EventStores.csproj" />
<ProjectReference Include="..\..\Microwave\Microwave.csproj" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion Microwave.Discovery/ServiceBaseAddressCollection.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;

namespace Microwave
namespace Microwave.Discovery
{
public class ServiceBaseAddressCollection : List<Uri>
{
Expand Down
95 changes: 95 additions & 0 deletions Microwave.Eventstores.Persistence.CosmosDb/CosmosDb.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
using System.Collections.ObjectModel;
using System.Security;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;

namespace Microwave.Persistence.CosmosDb
{
public class CosmosDb : ICosmosDb
{
public SecureString PrimaryKey { get; set; }

public Uri CosmosDbLocation { get; set; }

public DocumentClient GetCosmosDbClient()
{
var client = new DocumentClient(CosmosDbLocation, PrimaryKey);
return client;
}

public string DatabaseId => "Eventstore";
public string EventsCollectionId => "DomainEvents";
public string SnapshotsCollectionId => "Snapshots";
public string ServiceMapCollectionId => "ServiceMap";
public string StatusCollectionId => "Status";
public string VersionCollectionId => "Versions";

public async Task InitializeCosmosDb()
{
var client = new DocumentClient(CosmosDbLocation, PrimaryKey);

var domainEventsCollection = new DocumentCollection
{
Id = EventsCollectionId
};
domainEventsCollection.UniqueKeyPolicy = new UniqueKeyPolicy
{
UniqueKeys =
new Collection<UniqueKey>
{
new UniqueKey {Paths = new Collection<string> {"/Version", "/DomainEvent/EntityId/Id"}}
}
};

var snapShotCollection = new DocumentCollection
{
Id = SnapshotsCollectionId
};
snapShotCollection.UniqueKeyPolicy = new UniqueKeyPolicy
{
UniqueKeys =
new Collection<UniqueKey>
{
new UniqueKey {Paths = new Collection<string> {"/Version", "/Id/Id"}}
}
};
var versionCollection = new DocumentCollection
{
Id = VersionCollectionId
};
//versionCollection.UniqueKeyPolicy = new UniqueKeyPolicy
//{
// UniqueKeys =
// new Collection<UniqueKey>
// {
// new UniqueKey {Paths = new Collection<string> {"/Version", "/DomainEventType"}}
// }
//};

try
{
await client.CreateDatabaseIfNotExistsAsync(new Database {Id = DatabaseId});
await client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
domainEventsCollection);
await client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
new DocumentCollection{Id = StatusCollectionId});
await client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
snapShotCollection);
await client.CreateDocumentCollectionIfNotExistsAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
versionCollection);
}
catch (DocumentClientException e)
{
throw new ArgumentException(
$"Could not create Database or Collection with given CosmosDb Configuration Parameters! Exception : {e}" );

}
}
}
}
Loading