Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions src/main/java/com/manwe/dsl/config/DSLServerConfigs.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import net.neoforged.neoforge.common.ModConfigSpec;

import java.net.InetAddress;
import java.util.List;

public class DSLServerConfigs {

private static final ModConfigSpec.Builder BUILDER = new ModConfigSpec.Builder();
Expand All @@ -13,19 +16,28 @@ public class DSLServerConfigs {
public static final ModConfigSpec.ConfigValue<Integer> REGION_SIZE;
public static final ModConfigSpec.ConfigValue<Integer> WORKER_SIZE;

public static final ModConfigSpec.ConfigValue<Boolean> USE_ARBITER;
//public static final ModConfigSpec.ConfigValue<List<InetAddress>> CONNECTIONS;

static {
BUILDER.push("server");
ARBITER_ADDR = BUILDER.comment("Complete address of the arbiter").define("arbiter_addr","http://localhost:8080");
BUILDER.push("Server");
IS_PROXY = BUILDER.comment("Is this node a proxy? (only one can be the proxy)").define("is_proxy",true);
WORKER_ID = BUILDER.comment("If false which worker id?").define("worker_id",1);
BUILDER.pop();

BUILDER.push("topology").comment("This values need to be equal in all nodes");
BUILDER.push("Nodes").comment("This values need to be equal in all nodes");
WORKER_SIZE = BUILDER.comment("Number of workers").define("worker_size",1);
REGION_SIZE = BUILDER.comment("Size of the smallest server in region cords. (1 Region = 32x32 chunks)").define("region_size",4);
BUILDER.pop();

BUILDER.push("Auto start").comment("Address assignment for workers");
USE_ARBITER = BUILDER.comment("Use the arbiter").define("use_arbiter",true);
ARBITER_ADDR = BUILDER.comment("Complete address of the arbiter, This is ignored if use_arbiter = false").define("arbiter_addr","http://localhost:8080");
//BUILDER.push("If proxy").comment("Connections to the workers");
//CONNECTIONS = BUILDER.comment("").define("arbiter_addr","http://localhost:8080");
//BUILDER.pop();
BUILDER.pop();

/*
BUILDER.push("Into Pattern").comment(
"""
Expand Down
9 changes: 0 additions & 9 deletions src/main/java/com/manwe/dsl/connectionRouting/Region.java

This file was deleted.

64 changes: 60 additions & 4 deletions src/main/java/com/manwe/dsl/connectionRouting/RegionRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ public class RegionRouter {
private final EventLoopGroup ioGroup;

//Topology information
private final int workerSize = DSLServerConfigs.WORKER_SIZE.get();
private final int regionSize = DSLServerConfigs.REGION_SIZE.get();
private static final int nWorkers = DSLServerConfigs.WORKER_SIZE.get();
private static final int regionSize = DSLServerConfigs.REGION_SIZE.get();
private static final int workerId = DSLServerConfigs.WORKER_ID.get();

public RegionRouter(ProxyDedicatedServer server){
this.ioGroup = new NioEventLoopGroup(1); //TODO especificar numero correcto de hilos
Expand All @@ -48,7 +49,7 @@ public RegionRouter(ProxyDedicatedServer server){

//Create a tunnel for each worker
for(ConnectionInfo connection : workers){
WorkerTunnel tunnel = new WorkerTunnel(new InetSocketAddress(connection.ip(),connection.port()),this);
WorkerTunnel tunnel = new WorkerTunnel(new InetSocketAddress(connection.ip(),connection.port()),this,server);
workerTunnels.put(connection.id(),tunnel);
}
}
Expand Down Expand Up @@ -139,7 +140,9 @@ public static int computeWorkerId(double x, double z, int nWorkers, int regionSi
}

/**
* @return The ID of the server allocated to this position (in block coordinates)
* @param x block coordinates
* @param z block coordinates
* @return The ID of the server allocated to this position
*/
public static int computeWorkerId(int x, int z, int nWorkers, int regionSize){
if(nWorkers == 1) return 1;
Expand All @@ -163,6 +166,59 @@ public static int computeWorkerId(int x, int z, int nWorkers, int regionSize){
return base + offset;
}

/**
* @param x chunk coordinates
* @param z chunk coordinates
* @return The ID of the server allocated to this position
*/
public static boolean isChunkInWorkerDomain(int x, int z){
if(nWorkers == 1) return true;
if (nWorkers == 2) return z >= 0 ? workerId == 1 : workerId == 2;
if (nWorkers % 4 != 0) throw new RuntimeException("Invalid number of workers n:"+nWorkers+". Valid numbers are 1, 2 or any other number divisible by 4");

//To file region cords 512*512
int regionX = x >> 5;
int regionZ = z >> 5;

int nWorkersCuad = nWorkers/4;
int quad = (regionX >= 0 ? 0 : 2) + (regionZ < 0 ? 1 : 0);
int base = quad * nWorkersCuad; //base id of the quad

//Longest cord in abs
int maxSide = Math.max(Math.abs(regionX),Math.abs(regionZ));

int nRegions = maxSide / regionSize;
int offset = (nRegions == 0) ? 1 : (int) Math.floor(Math.log(nRegions) / Math.log(2)) + 2;

return workerId == base + offset;
}

/**
* @param pChunkPos long format of chunk coordinates
* @return The ID of the server allocated to this position
*/
public static boolean isChunkInWorkerDomain(long pChunkPos){
//To file region cords 512*512
int x = ((int) (pChunkPos)) >> 5;
int z = ((int) (pChunkPos >>> 32)) >> 5;

if(nWorkers == 1) return true;
if (nWorkers == 2) return z >= 0 ? workerId == 1 : workerId == 2;
if (nWorkers % 4 != 0) throw new RuntimeException("Invalid number of workers n:"+nWorkers+". Valid numbers are 1, 2 or any other number divisible by 4");

int nWorkersCuad = nWorkers/4;
int quad = (x >= 0 ? 0 : 2) + (z < 0 ? 1 : 0);
int base = quad * nWorkersCuad; //base id of the quad

//Longest cord in abs
int maxSide = Math.max(Math.abs(x),Math.abs(z));

int nRegions = maxSide / regionSize;
int offset = (nRegions == 0) ? 1 : (int) Math.floor(Math.log(nRegions) / Math.log(2)) + 2;

return workerId == base + offset;
}

/**
* @return The ID of the server that manages the spawn area
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package com.manwe.dsl.dedicatedServer;

import com.manwe.dsl.dedicatedServer.proxy.back.listeners.ProxyListener;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.ProxyBoundPlayerTransferACKPacket;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.ProxyBoundPlayerTransferPacket;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundPlayerInitPacket;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundPlayerDisconnectPacket;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.ProxyBoundContainerPacket;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.*;
import com.manwe.dsl.dedicatedServer.worker.packets.*;
import com.manwe.dsl.dedicatedServer.worker.listeners.WorkerListener;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundContainerPacket;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundPlayerTransferPacket;
import net.minecraft.network.ConnectionProtocol;
import net.minecraft.network.FriendlyByteBuf;
import net.minecraft.network.ProtocolInfo;
Expand All @@ -21,6 +16,7 @@ public class InternalGameProtocols {
.addPacket(InternalPacketTypes.PROXY_WORKER_CLIENT_LOGIN, WorkerBoundPlayerInitPacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.PROXY_WORKER_CLIENT_DISCONNECT, WorkerBoundPlayerDisconnectPacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.PROXY_WORKER_PLAYER_TRANSFER, WorkerBoundPlayerTransferPacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.PROXY_WORKER_PLAYER_INIT_ACK, WorkerBoundPlayerInitACKPacket.STREAM_CODEC)
);
public static final ProtocolInfo<WorkerListener> SERVERBOUND = SERVERBOUND_TEMPLATE.bind(FriendlyByteBuf::new);

Expand All @@ -30,6 +26,8 @@ public class InternalGameProtocols {
consumer.addPacket(InternalPacketTypes.WORKER_PROXY_PACKET_CONTAINER, ProxyBoundContainerPacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.WORKER_PROXY_PLAYER_TRANSFER, ProxyBoundPlayerTransferPacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.WORKER_PROXY_PLAYER_TRANSFER_ACK, ProxyBoundPlayerTransferACKPacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.WORKER_PROXY_SAVE_PLAYER_STATE, ProxyBoundSavePlayerStatePacket.STREAM_CODEC)
.addPacket(InternalPacketTypes.WORKER_PROXY_PLAYER_INIT_ACK, ProxyBoundPlayerInitACKPacket.STREAM_CODEC)
);

public static final ProtocolInfo<ProxyListener> CLIENTBOUND = CLIENTBOUND_TEMPLATE.bind(FriendlyByteBuf::new);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package com.manwe.dsl.dedicatedServer;

import com.manwe.dsl.dedicatedServer.proxy.back.listeners.ProxyListener;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.ProxyBoundContainerPacket;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.ProxyBoundPlayerTransferACKPacket;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.ProxyBoundPlayerTransferPacket;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundPlayerInitPacket;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundPlayerDisconnectPacket;
import com.manwe.dsl.dedicatedServer.proxy.back.packets.*;
import com.manwe.dsl.dedicatedServer.worker.packets.*;
import com.manwe.dsl.dedicatedServer.worker.listeners.WorkerListener;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundContainerPacket;
import com.manwe.dsl.dedicatedServer.worker.packets.WorkerBoundPlayerTransferPacket;
import net.minecraft.network.protocol.Packet;
import net.minecraft.network.protocol.PacketFlow;
import net.minecraft.network.protocol.PacketType;
import net.minecraft.resources.ResourceLocation;

public class InternalPacketTypes {
public static final PacketType<WorkerBoundPlayerInitPacket> PROXY_WORKER_CLIENT_LOGIN = createServerbound("worker_player_login");
public static final PacketType<WorkerBoundPlayerInitACKPacket> PROXY_WORKER_PLAYER_INIT_ACK = createServerbound("worker_player_init_ack");
public static final PacketType<WorkerBoundPlayerDisconnectPacket> PROXY_WORKER_CLIENT_DISCONNECT = createServerbound("worker_player_disconnect");
public static final PacketType<WorkerBoundContainerPacket> PROXY_WORKER_PACKET_CONTAINER = createServerbound("worker_packet_container");
public static final PacketType<WorkerBoundPlayerTransferPacket> PROXY_WORKER_PLAYER_TRANSFER = createServerbound("worker_player_transfer");
Expand All @@ -28,6 +24,8 @@ private static <T extends Packet<WorkerListener>> PacketType<T> createServerboun
public static final PacketType<ProxyBoundContainerPacket> WORKER_PROXY_PACKET_CONTAINER = createClientbound("proxy_packet_container");
public static final PacketType<ProxyBoundPlayerTransferPacket> WORKER_PROXY_PLAYER_TRANSFER = createClientbound("proxy_player_transfer");
public static final PacketType<ProxyBoundPlayerTransferACKPacket> WORKER_PROXY_PLAYER_TRANSFER_ACK = createClientbound("proxy_player_transfer_ack");
public static final PacketType<ProxyBoundSavePlayerStatePacket> WORKER_PROXY_SAVE_PLAYER_STATE = createClientbound("proxy_save_player_state");
public static final PacketType<ProxyBoundPlayerInitACKPacket> WORKER_PROXY_PLAYER_INIT_ACK = createClientbound("proxy_player_init_ack");

private static <T extends Packet<ProxyListener>> PacketType<T> createClientbound(String pId) {
return new PacketType<>(PacketFlow.CLIENTBOUND, ResourceLocation.withDefaultNamespace(pId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import com.manwe.dsl.config.DSLServerConfigs;
import com.manwe.dsl.dedicatedServer.worker.LocalPlayerList;
import com.manwe.dsl.mixin.accessors.DedicatedServerAccessor;
import com.manwe.dsl.mixin.accessors.EntityAccessor;
import com.mojang.datafixers.DataFixer;
import net.minecraft.*;
import net.minecraft.server.*;
import net.minecraft.server.dedicated.*;
import net.minecraft.server.level.ServerPlayer;
import net.minecraft.server.level.progress.ChunkProgressListenerFactory;
import net.minecraft.server.packs.repository.PackRepository;
import net.minecraft.server.players.GameProfileCache;
Expand Down Expand Up @@ -39,9 +41,13 @@
public class ProxyDedicatedServer extends DedicatedServer {

URI arbiterUri = URI.create(DSLServerConfigs.ARBITER_ADDR.get());
boolean isProxy = DSLServerConfigs.IS_PROXY.get();
int workerSize = DSLServerConfigs.WORKER_SIZE.get();
int workerId = DSLServerConfigs.WORKER_ID.get();
ArbiterClient arbiterClient = new ArbiterClient(arbiterUri);
private DedicatedPlayerList localRemotePlayerListRef;
private ArbiterClient.ArbiterRes topology;
private static final int SHARD_MAX_ENTITIES = 1_000_000;

public ProxyDedicatedServer(Thread pServerThread, LevelStorageSource.LevelStorageAccess pStorageSource, PackRepository pPackRepository, WorldStem pWorldStem, DedicatedServerSettings pSettings, DataFixer pFixerUpper, Services pServices, ChunkProgressListenerFactory pProgressListenerFactory) {
super(pServerThread, pStorageSource, pPackRepository, pWorldStem, pSettings, pFixerUpper, pServices, pProgressListenerFactory);
Expand All @@ -53,6 +59,18 @@ public ProxyDedicatedServer(Thread pServerThread, LevelStorageSource.LevelStorag

@Override
public boolean initServer() throws IOException {
try {
topology = arbiterClient.fetch();
System.out.println("Port from Arbiter: "+topology.port);
} catch (Exception ex) {
DistributedServerLevels.LOGGER.error("Unexpected Error",ex);
throw new RuntimeException("Arbiter unavailable cannot fetch port and role");
}

if(!isProxy){
EntityAccessor.getEntityCounter().set((workerId - 1) * SHARD_MAX_ENTITIES); //Set exclusive entity ids for each worker
}

Thread thread = new Thread("Server console handler") {
@Override
public void run() {
Expand Down Expand Up @@ -100,16 +118,8 @@ public void run() {
inetaddress = InetAddress.getByName(this.getLocalIp());
}


try {
topology = arbiterClient.fetch();
this.setPort(topology.port);
System.out.println("From Arbiter: "+topology.port);
} catch (Exception ex) {
DistributedServerLevels.LOGGER.error("Unexpected Error",ex);
throw new RuntimeException("Arbiter unavailable cannot fetch port and role");
}

//SET PORT
this.setPort(topology.port);

this.initializeKeyPair();
DistributedServerLevels.LOGGER.info("Starting Minecraft server on {}:{}", this.getLocalIp().isEmpty() ? "*" : this.getLocalIp(), this.getPort());
Expand Down Expand Up @@ -146,7 +156,7 @@ public void run() {
if (!OldUsersConverter.serverReadyAfterUserconversion(this)) {
return false;
} else {
if(DSLServerConfigs.IS_PROXY.get()){
if(isProxy){
//Set RemotePlayerList
localRemotePlayerListRef = new RemotePlayerList(this, this.registries(), this.playerDataStorage);
}else {
Expand Down Expand Up @@ -210,6 +220,12 @@ public void tickServer(BooleanSupplier pHasTimeLeft) {
super.tickServer(pHasTimeLeft);
}

@Override
public void tickChildren(BooleanSupplier pHasTimeLeft) {
//System.out.println("TICK CHILDREN");
super.tickChildren(pHasTimeLeft);
}

public boolean isProxy(){
return DSLServerConfigs.IS_PROXY.get();
}
Expand Down
Loading
Loading