Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/.idea
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DataPlane.Sdk.Api" Version="0.0.1-alpha12"/>
<PackageReference Include="DataPlane.Sdk.Api" Version="0.0.1-alpha13"/>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.8"/>
</ItemGroup>

Expand Down
20 changes: 20 additions & 0 deletions Samples/Streaming/Consumer/Consumer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DataPlane.Sdk.Api" Version="0.0.1-alpha13"/>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.10"/>
<PackageReference Include="NATS.Client.Core" Version="2.7.0-preview.7"/>
</ItemGroup>

<!-- uncomment when debugging the local SDK-->
<!-- <ItemGroup>-->
<!-- <ProjectReference Include="..\..\..\DataPlane.Sdk.Api\DataPlane.Sdk.Api.csproj"/>-->
<!-- </ItemGroup>-->
</Project>
40 changes: 40 additions & 0 deletions Samples/Streaming/Consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# syntax=docker/dockerfile:1

# Build stage
FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build
WORKDIR /src

# Copy everything and restore
# (Assumes this Dockerfile sits in the project directory next to the .csproj)
COPY . .

ARG NUGET_USERNAME
ARG NUGET_PASSWORD

RUN dotnet nuget add source "https://nuget.pkg.github.com/eclipse-dataplane-core/index.json" \
--name dcore \
--username $NUGET_USERNAME \
--password $NUGET_PASSWORD \
--store-password-in-clear-text \
&& dotnet restore Consumer.csproj \
&& dotnet nuget remove source dcore

# Publish (framework-dependent)
RUN dotnet publish Consumer.csproj -c Release -o /app/publish --no-restore

# Runtime stage
FROM mcr.microsoft.com/dotnet/aspnet:9.0 AS final
WORKDIR /app

# Configure ASP.NET Core to listen on port 8080 and set environment
ENV ASPNETCORE_URLS=http://+:8080 \
ASPNETCORE_ENVIRONMENT=Production

# Copy published output
COPY --from=build /app/publish ./

# Expose HTTP
EXPOSE 8080

# Start the app
ENTRYPOINT ["dotnet", "./Consumer.dll"]
81 changes: 81 additions & 0 deletions Samples/Streaming/Consumer/Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Consumer.Nats;
using DataPlane.Sdk.Api;
using DataPlane.Sdk.Core;
using DataPlane.Sdk.Core.Domain.Model;
using Microsoft.IdentityModel.Tokens;
using static DataPlane.Sdk.Core.Data.DataFlowContextFactory;

namespace Consumer;

public static class Extensions
{
public static void AddDataPlaneSdk(this IServiceCollection services, IConfiguration configuration)
{
// initialize and configure the DataPlaneSdk
var dataplaneConfig = configuration.GetSection("DataPlaneSdk");
var config = dataplaneConfig.Get<DataPlaneSdkOptions>() ?? throw new ArgumentException("Configuration invalid!");
var dataFlowContext = () => CreatePostgres(configuration, config.RuntimeId);


var sdk = new DataPlaneSdk
{
DataFlowStore = dataFlowContext,
RuntimeId = config.RuntimeId,
OnStart = dataFlow =>
{
if (dataFlow.Destination == null)
{
return StatusResult<DataFlow>.BadRequest("DataFlow.Destination cannot be null");
}

var dataService = services.BuildServiceProvider().GetRequiredService<NatsSubscriber>();
dataService.Start(NatsDataAddress.Create(dataFlow.Destination)).Wait();
return StatusResult<DataFlow>.Success(dataFlow);
},
OnTerminate = df => StatusResult.Success(),
OnSuspend = _ => StatusResult.Success(),
OnPrepare = f =>
{
f.IsConsumer = true;
f.State = DataFlowState.Prepared;
return StatusResult<DataFlow>.Success(f);
},
OnComplete = _ => StatusResult.Success()
};

services.AddSingleton<NatsSubscriber>();


// add SDK core services
services.AddSdkServices(sdk, dataplaneConfig);

// Configuration for keycloak. Effectively, this sets the default authentication scheme to "KeycloakJWT",
// foregoing the SDK default authentication scheme and using Keycloak as the identity provider.
// This assumes that Keycloak is running on http://keycloak:8080, which is the default if launched with docker-compose.

var jwtSettings = configuration.GetSection("JwtSettings");

services.AddAuthentication("KeycloakJWT")
.AddJwtBearer("KeycloakJWT", options =>
{
// Configure Keycloak as the Identity Provider
options.Authority = jwtSettings["Authority"];
options.RequireHttpsMetadata = false; // Only for develop

options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidIssuer = jwtSettings["Issuer"],
ValidateAudience = true,
ValidAudience = jwtSettings["Audience"],
ValidateIssuerSigningKey = true,
ValidateLifetime = true,
ValidateActor = false,
ValidateTokenReplay = true
};
});

// wire up ASP.net authorization handlers
services.AddSdkAuthorization();
}
}
8 changes: 8 additions & 0 deletions Samples/Streaming/Consumer/Nats/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Consumer.Nats;

public static class Constants
{
public static readonly string DataAddressType = "NatsStream";
public static readonly string ForwardChannelSuffix = "forward";
public static readonly string ReplyChannelSuffix = "reply";
}
91 changes: 91 additions & 0 deletions Samples/Streaming/Consumer/Nats/NatsDataAddress.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System.Text.Json;
using System.Text.Json.Serialization;
using DataPlane.Sdk.Core.Domain.Model;

namespace Consumer.Nats;

public class NatsDataAddress : DataAddress
{
public NatsDataAddress() : base(Constants.DataAddressType)
{
Properties["endpointType"] = "https://example.com/natsdp/v1/nats";
}

public string NatsEndpoint
{
init => Properties["endpoint"] = value;
get => Properties["endpoint"].ToString() ?? throw new InvalidOperationException("No 'endpoint' endpointProperty found");
}

public string Channel
{
get
{
var property = GetEndpointProperty("channel");
return (property?.Value ?? null) ?? throw new InvalidOperationException("No 'channel' endpointProperty found");
}
init => StringEndpointProperty("channel", value);
}

public string ReplyChannel
{
init => StringEndpointProperty("replyChannel", value);
}

private EndpointProperty? GetEndpointProperty(string key)
{
var props = Properties["endpointProperties"];

List<EndpointProperty> epp;
if (props is JsonElement)
{
epp = JsonSerializer.Deserialize<List<EndpointProperty>>(props.ToString());

Check warning on line 42 in Samples/Streaming/Consumer/Nats/NatsDataAddress.cs

View workflow job for this annotation

GitHub Actions / build

Converting null literal or possible null value to non-nullable type.

Check warning on line 42 in Samples/Streaming/Consumer/Nats/NatsDataAddress.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'json' in 'List<EndpointProperty>? JsonSerializer.Deserialize<List<EndpointProperty>>(string json, JsonSerializerOptions? options = null)'.
}
else
{
epp = props as List<EndpointProperty>;

Check warning on line 46 in Samples/Streaming/Consumer/Nats/NatsDataAddress.cs

View workflow job for this annotation

GitHub Actions / build

Converting null literal or possible null value to non-nullable type.
}


var property = epp?.Find(p => p.Key.Equals(key));
return property;
}

public static NatsDataAddress Create(DataAddress rawSource)
{
return new NatsDataAddress
{
Properties = rawSource.Properties,
Id = rawSource.Id
};
}

private void StringEndpointProperty(string key, string endpointPropertyValue)
{
if (!Properties.TryGetValue("endpointProperties", out var existing))
{
existing = new List<EndpointProperty>();
Properties["endpointProperties"] = existing;
}

var epProps = existing as List<EndpointProperty>;
epProps!.Add(new EndpointProperty
{
Key = key,
Type = "string",
Value = endpointPropertyValue
});
}

public class EndpointProperty
{
[JsonPropertyName("key")]
public required string Key { get; init; }

[JsonPropertyName("type")]
public required string Type { get; init; }

[JsonPropertyName("value")]
public required string Value { get; init; }
}
}
54 changes: 54 additions & 0 deletions Samples/Streaming/Consumer/Nats/NatsSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using NATS.Client.Core;

namespace Consumer.Nats;

public class NatsSubscriber(ILogger<NatsSubscriber> logger)
{
private static readonly IDictionary<string, Task> BackgroundTasks = new Dictionary<string, Task>();
private static readonly IDictionary<string, CancellationTokenSource> CancellationTokens = new Dictionary<string, CancellationTokenSource>();

public async Task Start(NatsDataAddress nats)

Check warning on line 10 in Samples/Streaming/Consumer/Nats/NatsSubscriber.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
var channel = nats.Channel;
var cts = new CancellationTokenSource();

BackgroundTasks.Add(channel, Task.Run(async () =>
{
await using var conn = new NatsConnection(new NatsOpts
{
Url = nats.NatsEndpoint
});

while (!cts.Token.IsCancellationRequested)
{
await foreach (var mesg in conn.SubscribeAsync<string>(channel, cancellationToken: cts.Token))
{
var data = mesg.Data;
logger.LogInformation("Received {Data}", data);
}
}
}, cts.Token));

CancellationTokens.Add(channel, cts);
}


public async Task Stop(NatsDataAddress nats)
{
var channel = nats.Channel;

if (CancellationTokens.TryGetValue(channel, out var ct))
{
await ct.CancelAsync();
logger.LogDebug("Stopping {Channel}", channel);
CancellationTokens.Remove(channel);
}

if (BackgroundTasks.TryGetValue(channel, out var task))
{
await task;
BackgroundTasks.Remove(channel);
logger.LogDebug("Stopped {Channel}", channel);
}
}
}
33 changes: 33 additions & 0 deletions Samples/Streaming/Consumer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using Consumer;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi
builder.Services.AddOpenApi();
builder.Services.AddControllers();
// SDK: add all services, read configuration etc.
builder.Services.AddDataPlaneSdk(builder.Configuration);

// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi
builder.Services.AddOpenApi();


// Configure the HTTP request pipeline.
var app = builder.Build();

if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}


app.UseHttpsRedirection();


app.UseAuthentication();
app.UseAuthorization();

app.MapControllers();

app.Run();
23 changes: 23 additions & 0 deletions Samples/Streaming/Consumer/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"$schema": "https://json.schemastore.org/launchsettings.json",
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:8081",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Local"
}
},
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "https://localhost:7118;http://localhost:5252",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
Loading
Loading