Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 85 additions & 50 deletions pkg/portallocator/port_allocator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package portallocator

import (
"errors"
"fmt"
"maps"
"slices"
Expand Down Expand Up @@ -50,29 +51,31 @@ type AllocatedPortRange struct {
Count uint `json:"count"`
}

func (p *PortAllocator) AllocatePortRange(ctx restate.ObjectContext, params AllocatePortsParams) (*AllocatedPortRange, error) {
allowedPorts, err := restate.Get[AllowedPorts](ctx, stateAllowed)
if err != nil {
return nil, fmt.Errorf("failed to get allowed ports: %w", err)
}
type AllocationErrorKind = string

allocatedPortsMap, err := restate.Get[map[string]AllocatedPortRange](ctx, stateAllocated)
if err != nil {
return nil, fmt.Errorf("failed to get allocated ports: %w", err)
}
const NO_ALLOCATABLE_RANGES AllocationErrorKind = "NO_ALLOCATABLE_RANGES"

{
allocation, ok := allocatedPortsMap[params.Network]
if ok {
if allocation.Count != uint(params.Count) {
return nil, restate.TerminalErrorf("already allocated port range for %s with count %d does not match requested count %d", params.Network, allocation.Count, params.Count)
}
type PortAllocationError struct {
kind AllocationErrorKind
allowed AllowedPorts
allocated []AllocatedPortRange
count uint
}

return &allocation, nil
}
func (e *PortAllocationError) Error() string {
msg := "Unknown port allocation error"

switch e.kind {
case NO_ALLOCATABLE_RANGES:
msg = "Failed to find valid port range to allocate"
}

allocated := slices.Collect(maps.Values(allocatedPortsMap))
return msg
}

func allocate(allowedPorts AllowedPorts, allocatedPorts []AllocatedPortRange, portCount uint) (*AllocatedPortRange, error) {
allocated := make([]AllocatedPortRange, len(allocatedPorts))
copy(allocated, allocatedPorts)

slices.SortFunc(allocated, func(a, b AllocatedPortRange) int {
return int(a.From) - int(b.From)
Expand All @@ -81,66 +84,98 @@ func (p *PortAllocator) AllocatePortRange(ctx restate.ObjectContext, params Allo
if len(allocated) == 0 {
newAllocated := AllocatedPortRange{
From: allowedPorts.From,
Count: uint(params.Count),
Count: portCount,
}

if !allowedPorts.IsAllowed(newAllocated) {
return nil, restate.TerminalErrorf("not enough ports available")
return nil, errors.New("Not enough ports available")
}

allocatedPortsMap[params.Network] = newAllocated

restate.Set(ctx, stateAllocated, allocatedPortsMap)

return &newAllocated, nil
}

prevAllocated := allocated[0]
for idx, current := range allocated[1:] {
previous := allocated[idx]

if (prevAllocated.From - allowedPorts.From) >= uint(params.Count) {
newAllocated := AllocatedPortRange{
From: allowedPorts.From,
Count: uint(params.Count),
if current.From == previous.From {
continue
}

if current.From - (previous.From + previous.Count) >= portCount {
newAllocated := AllocatedPortRange{
From: previous.From + previous.Count,
Count: portCount,
}

return &newAllocated, nil
}
}

allocatedPortsMap[params.Network] = newAllocated
lastAllocated := allocated[len(allocated) - 1]

restate.Set(ctx, stateAllocated, allocatedPortsMap)
if lastAllocated.From + lastAllocated.Count + portCount < allowedPorts.To {
newAllocated := AllocatedPortRange{
From: lastAllocated.From + lastAllocated.Count,
Count: portCount,
}

return &newAllocated, nil
}

for _, allocatedPort := range allocated[1:] {
if allocatedPort.From-(prevAllocated.From+prevAllocated.Count) >= uint(params.Count) {
newAllocated := AllocatedPortRange{
From: prevAllocated.From + prevAllocated.Count,
Count: uint(params.Count),
}
err := PortAllocationError {
kind: NO_ALLOCATABLE_RANGES,
allowed: allowedPorts,
allocated: allocated,
count: portCount,
}
return nil, &err
}

allocatedPortsMap[params.Network] = newAllocated
func (p *PortAllocator) AllocatePortRange(ctx restate.ObjectContext, params AllocatePortsParams) (*AllocatedPortRange, error) {
allowedPortsPtr, err := restate.Get[*AllowedPorts](ctx, stateAllowed)
if err != nil {
return nil, fmt.Errorf("failed to get allowed ports: %w", err)
}
if allowedPortsPtr == nil {
return nil, restate.TerminalErrorf("port allocator not initialized: missing %q", stateAllowed)
}

restate.Set(ctx, stateAllocated, allocatedPortsMap)
allowedPorts := *allowedPortsPtr
if allowedPorts.From == 0 && allowedPorts.To == 0 {
return nil, fmt.Errorf("Invalid allowed port range: From=%d, To=%d", allowedPorts.From, allowedPorts.To)
}

return &newAllocated, nil
}
allocatedPortsMapPtr, err := restate.Get[*map[string]AllocatedPortRange](ctx, stateAllocated)
if err != nil {
return nil, fmt.Errorf("failed to get allocated ports: %w", err)
}
if allocatedPortsMapPtr == nil {
return nil, restate.TerminalErrorf("port allocator not initialized: missing %q", stateAllocated)
}

lastAllocated := allocated[len(allocated)-1]
allocatedPortsMap := *allocatedPortsMapPtr

newAllocated := AllocatedPortRange{
From: lastAllocated.From + lastAllocated.Count,
Count: uint(params.Count),
}
{
allocation, ok := allocatedPortsMap[params.Network]
if ok {
if allocation.Count != uint(params.Count) {
return nil, restate.TerminalErrorf("already allocated port range for %s with count %d does not match requested count %d", params.Network, allocation.Count, params.Count)
}

if !allowedPorts.IsAllowed(newAllocated) {
return nil, restate.TerminalErrorf("not enoughports available")
return &allocation, nil
}
}

allocatedPortsMap[params.Network] = newAllocated
allocated := slices.Collect(maps.Values(allocatedPortsMap))
newAllocated, err := allocate(allowedPorts, allocated, params.Count)
if err != nil {
return nil, restate.TerminalError(err)
}

allocatedPortsMap[params.Network] = *newAllocated
restate.Set(ctx, stateAllocated, allocatedPortsMap)

return &newAllocated, nil
return newAllocated, nil
}

func (p *PortAllocator) FreePortsForNetwork(ctx restate.ObjectContext, network string) error {
Expand Down