diff --git a/bin/apl-common.sh b/bin/apl-common.sh index e567559..007c626 100755 --- a/bin/apl-common.sh +++ b/bin/apl-common.sh @@ -71,7 +71,7 @@ jdk_version() { local result local IFS=$'\n' # remove \r for Cygwin - local lines=$("$JAVA_CMD" -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n') + local lines=$("$JAVA_CMD" -Xms32M -Xmx32M -XX:+PrintConcurrentLocks -version 2>&1 | tr '\r' '\n') if [[ -z $JAVA_CMD ]] then result=no_java diff --git a/conf/peers-15t.json b/conf/peers-15t.json index a67ab29..6b9c706 100644 --- a/conf/peers-15t.json +++ b/conf/peers-15t.json @@ -2,6 +2,8 @@ "defaultPort": 7876, "warningLevel" : 720, "criticalLevel" : 2000, + "skipNotRespondingHost" : false, + "jettyServerPort" : 8274, "peersInfo": [ { "host": "51.158.190.152" diff --git a/conf/peers-1t.json b/conf/peers-1t.json index 52af2b5..cbeb569 100644 --- a/conf/peers-1t.json +++ b/conf/peers-1t.json @@ -2,6 +2,8 @@ "defaultPort": 7876, "warningLevel" : 720, "criticalLevel" : 21000, + "skipNotRespondingHost" : false, + "jettyServerPort" : 8271, "peersInfo": [ { "host": "apl-t1-1.testnet.apollowallet.org", diff --git a/conf/peers-2t.json b/conf/peers-2t.json index 5491582..21b2e00 100644 --- a/conf/peers-2t.json +++ b/conf/peers-2t.json @@ -2,6 +2,8 @@ "defaultPort": 7876, "warningLevel" : 720, "criticalLevel" : 2000, + "skipNotRespondingHost" : false, + "jettyServerPort" : 8272, "peersInfo": [ { "host": "localhost" diff --git a/conf/peers-3t.json b/conf/peers-3t.json index 0293dae..abe1bc7 100644 --- a/conf/peers-3t.json +++ b/conf/peers-3t.json @@ -2,6 +2,8 @@ "defaultPort": 7876, "warningLevel" : 720, "criticalLevel" : 2000, + "skipNotRespondingHost" : false, + "jettyServerPort" : 8273, "peersInfo": [ {"host": "51.15.250.32" }, {"host": "51.15.253.171"}, diff --git a/conf/peers-tap.json b/conf/peers-tap.json index 61195ab..868917c 100644 --- a/conf/peers-tap.json +++ b/conf/peers-tap.json @@ -2,6 +2,8 @@ "defaultPort": 7876, "warningLevel" : 720, "criticalLevel" : 2000, + "skipNotRespondingHost" : false, + "jettyServerPort" : 8275, "peersInfo": [ {"host": "51.15.104.205" , "schema": "http"}, {"host": "51.15.85.92" , "schema": "http"}, diff --git a/conf/peers.json b/conf/peers.json index 260d776..eea0ac9 100644 --- a/conf/peers.json +++ b/conf/peers.json @@ -2,6 +2,8 @@ "defaultPort": 7876, "warningLevel" : 720, "criticalLevel" : 21000, + "skipNotRespondingHost" : true, + "jettyServerPort" : 8270, "peersInfo": [ {"host": "3.141.234.52", "schema": "https"}, diff --git a/pom.xml b/pom.xml index 9d68022..a71fca0 100644 --- a/pom.xml +++ b/pom.xml @@ -297,4 +297,4 @@ - \ No newline at end of file + diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/FetchHostResultService.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/FetchHostResultService.java new file mode 100644 index 0000000..71db3aa --- /dev/null +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/FetchHostResultService.java @@ -0,0 +1,18 @@ +package com.apollocurrency.aplwallet.apl.tools.impl.heightmon; + +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerInfo; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerMonitoringResult; + +import java.net.UnknownHostException; +import java.util.List; +import java.util.Map; + +/** + * Service to loop over host's list and gather Node's information by using HttpClient + */ +public interface FetchHostResultService { + Map getPeersMonitoringResults(); + List getPeerApiUrls(); + boolean addPeer(PeerInfo peerInfo) throws UnknownHostException; + List getAllPeers(); +} diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/FetchHostResultServiceImpl.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/FetchHostResultServiceImpl.java new file mode 100644 index 0000000..b6d5147 --- /dev/null +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/FetchHostResultServiceImpl.java @@ -0,0 +1,340 @@ +package com.apollocurrency.aplwallet.apl.tools.impl.heightmon; + +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.HeightMonitorConfig; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerInfo; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerMonitoringResult; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeersConfig; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.ShardDTO; +import com.apollocurrency.aplwallet.apl.util.Version; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.PreDestroy; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +@Slf4j +@NoArgsConstructor +public class FetchHostResultServiceImpl implements FetchHostResultService { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final int CONNECT_TIMEOUT = 5_000; + private static final int IDLE_TIMEOUT = 5_000; + private static final int BLOCKS_TO_RETRIEVE = 1000; + private static final Version DEFAULT_VERSION = new Version("0.0.0"); + private static final String URL_FORMAT = "%s://%s:%d"; + private HttpClient client; + private List peers; + private int port; + private List peerApiUrls; + private ExecutorService executor; + private HeightMonitorConfig config; + + static { + objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + public FetchHostResultServiceImpl(HeightMonitorConfig config) { + log.debug("Init FetchHostResultService..."); + this.config = config; + PeersConfig peersConfig = this.config.getPeersConfig(); + this.port = peersConfig.getDefaultPort(); + this.peers = Collections.synchronizedList(peersConfig.getPeersInfo().stream().peek(this::setDefaultPortIfNull).toList()); + this.peerApiUrls = Collections.synchronizedList(this.peers.stream().map(this::createUrl).toList()); + + client = createHttpClient(); + try { + client.start(); + int numberOfThreads = this.peers.size() * 3; // http client threads in pool + executor = Executors.newFixedThreadPool(numberOfThreads); + log.debug("HTTP client started with pool of '{}' threads...", numberOfThreads); + } catch (Exception e) { + log.error("Http Client init error", e); + throw new RuntimeException(e.toString(), e); + } + } + + private HttpClient createHttpClient() { + SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(); + sslContextFactory.setTrustAll(true); + + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSslContextFactory(sslContextFactory); + + HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector)); + httpClient.setIdleTimeout(IDLE_TIMEOUT); + httpClient.setConnectTimeout(CONNECT_TIMEOUT); + httpClient.setFollowRedirects(false); + return httpClient; + } + + @PreDestroy + public void shutdown() { + try { + if (this.client != null) { + client.stop(); + } + if (this.executor != null) { + executor.shutdownNow(); + } + } catch (Exception e) { + log.error("FetchHostResultService shutdown error...", e); +// throw new RuntimeException(e.toString(), e); + } + } + + @Override + public Map getPeersMonitoringResults() { + Map peerBlocks = new HashMap<>(peerApiUrls.size()); + List> getBlocksRequests = new ArrayList<>(peerApiUrls.size()); + for (int i = 0; i < peerApiUrls.size(); i++) { + String peerUrl = peerApiUrls.get(i); + int finalI = i; + getBlocksRequests.add(CompletableFuture.supplyAsync(() -> { + Map blocks = new HashMap<>(); + int height1 = getPeerHeight(peerUrl); + log.trace("processing peerUrl = '{}'", peerUrl); + int height2 = 0; + for (int j = finalI + 1; j < peerApiUrls.size(); j++) { + height2 = getPeerHeight(peerApiUrls.get(j)); + Block lastMutualBlock = findLastMutualBlock(height1, height2, peerUrl, peerApiUrls.get(j)); + if (lastMutualBlock != null) { + blocks.put(peerApiUrls.get(j), lastMutualBlock); + } + } + Version version = getPeerVersion(peerUrl); + List shards = getShards(peerUrl); + log.debug("DONE peerUrl = '{}' is live='{}'", peerUrl, height1 != -1); + return new PeerMonitoringResult(peerUrl, shards, height1, version, blocks, + height1 != -1, height2, this.config.getPeersConfig().getCriticalLevel()); + }, executor)); + } + for (int i = 0; i < getBlocksRequests.size(); i++) { + String host = peers.get(i).getHost(); + try { + peerBlocks.put(host, getBlocksRequests.get(i).get()); + } catch (Exception e) { + log.error("Error getting blocks for " + host, e); + } + } + return peerBlocks; + } + + public List getPeerApiUrls() { + return peerApiUrls; + } + + private List getShards(String peerUrl) { + List shards = new ArrayList<>(); + String uriToCall = peerUrl + "/rest/shards"; + log.trace("Call to Shards = {}", uriToCall); + Request request = client.newRequest(uriToCall) + .method(HttpMethod.GET); + ContentResponse response; + try { + response = request.send(); + shards = objectMapper.readValue(response.getContentAsString(), new TypeReference>() { + }); + } catch (InterruptedException | TimeoutException | ExecutionException | IOException e) { + log.error("Unable to get Shards or parse response from {} - {}", uriToCall, e.toString()); + } catch (Exception e) { + log.error("Unknown exception:", e); + } + log.trace("getShards result = {} by uri='{}'", shards, uriToCall); + return shards; + } + + private int getPeerHeight(String peerUrl) { + String uriToCall = peerUrl + "/apl"; + log.trace("Call to Remote = {}", uriToCall); + JsonNode jsonNode = performRequest(uriToCall, Map.of("requestType", "getBlock")); + int height = -1; + if (jsonNode != null) { + height = jsonNode.get("height").asInt(); + } + log.trace("getBlock peerHeight result = {} by uri='{}'", height, uriToCall); + return height; + } + + private long getPeerBlockId(String peerUrl, int height) { + String uriToCall = peerUrl + "/apl"; + JsonNode jsonNode = performRequest(uriToCall, Map.of("requestType", "getBlockId", "height", height)); + long blockId = -1; + if (jsonNode != null) { + blockId = Long.parseUnsignedLong(jsonNode.get("block").asText()); + } + log.trace("peerBlockId result = {}", blockId); + return blockId; + } + + private Block getPeerBlock(String peerUrl, int height) { + String uriToCall = peerUrl + "/apl"; + JsonNode node = performRequest(uriToCall, Map.of("requestType", "getBlock", "height", height)); + Block block = null; + if (node != null) { + try { + block = objectMapper.readValue(node.toString(), Block.class); + } catch (JsonProcessingException e) { + log.error("Unable to parse block from {} by peerUrl = '{}' at height = {}", node, peerUrl, height); + } + } + log.trace("peerBlock result = {}", block); + return block; + } + + private JsonNode performRequest(String url, Map params) { + JsonNode result = null; + log.trace("Call to Remote request = {} + {}", url, params); + Request request = client.newRequest(url) + .method(HttpMethod.GET); + params.forEach((name, value) -> request.param(name, value.toString())); + + ContentResponse response; + try { + response = request.send(); + if (response.getStatus() != 200) { + log.info("Bad response: from {}", request.getURI()); + } else { + JsonNode jsonNode = objectMapper.readTree(response.getContentAsString()); + if (jsonNode.has("errorDescription")) { + log.info("Error received from node: {} - {}", request.getURI(), jsonNode.get("errorDescription")); + } else { + result = jsonNode; + log.trace("Call result = {}", result); + } + } + } catch (InterruptedException | TimeoutException | ExecutionException | IOException e) { + log.info("Unable to get or parse response from {} {} - {}", url, params, e.toString()); + } catch (Exception e) { + log.info("Unknown exception:", e); + } + return result; + } + + private Version getPeerVersion(String peerUrl) { + Version res = DEFAULT_VERSION; + String uriToCall = peerUrl + "/apl"; + log.trace("Call to peerVersion = {}", uriToCall); + Request request = client.newRequest(uriToCall) + .method(HttpMethod.GET) + .param("requestType", "getBlockchainStatus"); + ContentResponse response = null; + try { + response = request.send(); + JsonNode jsonNode = objectMapper.readTree(response.getContentAsString()); + res = new Version(jsonNode.get("version").asText()); + log.trace("Call result = {}", res); + } catch (InterruptedException | TimeoutException | ExecutionException e) { + log.error("Unable to get peerVersion response from {} - {}", uriToCall, e.toString()); + } catch (IOException e) { + log.error("Unable to parse peerVersion from json for {} - {}", uriToCall, e.toString()); + } + return res; + } + + private Block findLastMutualBlock(int height1, int height2, String host1, String host2) { + log.trace("find Mutual between '{}' ({}) vs '{}' ({}) ...", host1, height1, host2, height2); + if (height2 == -1 || height1 == -1) { + return null; + } + int minHeight = Math.min(height1, height2); + int stHeight = minHeight; + int step = 1; + int firstMatchHeight = -1; + int mutualSearchDeepCount = BLOCKS_TO_RETRIEVE; + while (true) { + long peer1BlockId = getPeerBlockId(host1, stHeight); + long peer2BlockId = getPeerBlockId(host2, stHeight); + if ((peer1BlockId == peer2BlockId) && (peer2BlockId > 0)) { + firstMatchHeight = stHeight; + log.trace("found Mutual between '{}' ({}) vs '{}' ({}), match = {}", host1, height1, host2, height2, stHeight); + break; + } else { + if (stHeight <= 0 || mutualSearchDeepCount <= 0) { + log.debug("NOT found Mutual between '{}' ({}) vs '{}' ({}), match = {}, mutualSearchDeepCount = {}", + host1, height1, host2, height2, stHeight, mutualSearchDeepCount); + return null; // errors getting common block, 0 is reached (no network or similar) + } + stHeight = Math.max(0, stHeight - step); + step *= 2; + } + mutualSearchDeepCount--; + log.trace("nextLoop '{}' ({} / id = {}) vs '{}' ({} / id = {}), stHeight = {}, step = {}, mutualSearchDeepCount = {}", + host1, height1, peer1BlockId, host2, height2, peer2BlockId, stHeight, step, mutualSearchDeepCount); + } + Block block = null; + if (firstMatchHeight != -1) { + int tHeight = firstMatchHeight; + while (tHeight <= minHeight) { + long peer1BlockId = getPeerBlockId(host1, tHeight); + long peer2BlockId = getPeerBlockId(host2, tHeight); + if (peer2BlockId > 0 && peer1BlockId == peer2BlockId) { + block = getPeerBlock(host1, tHeight); + if (step == 1) { + break; + } + step = Math.max(step / 2, 1); + tHeight += step; + } else { + tHeight -= step; + } + } + } + + return block; + } + + @Override + public boolean addPeer(PeerInfo peer) { + Objects.requireNonNull(peer); + setDefaultPortIfNull(peer); + String url = createUrl(peer); + boolean result = false; + if (!peers.contains(peer)) { + result = true; + peers.add(peer); + peerApiUrls.add(url); + log.info("Added new peer: {}", peer.getHost()); + } + return result; + } + + @Override + public List getAllPeers() { + return peers; + } + + private void setDefaultPortIfNull(PeerInfo peerInfo) { + if (peerInfo.getPort() == null) { + peerInfo.setPort(port); + } + } + + private String createUrl(PeerInfo peerInfo) { + return String.format(URL_FORMAT, peerInfo.getSchema(), peerInfo.getHost(), peerInfo.getPort()); + } + +} diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitor.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitor.java index 2cd06d8..775db21 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitor.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitor.java @@ -23,6 +23,7 @@ public class HeightMonitor { private ScheduledExecutorService executor; private int delay; private AplContainer container; + private JettyServer jettyServer; public HeightMonitor(Integer delay) { this.executor = Executors.newScheduledThreadPool(1); @@ -45,6 +46,8 @@ public void start(HeightMonitorConfig config) { CDI.current().select(JettyServer.class).get(); Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); service.setUp(config); + this.jettyServer = new JettyServer(service); + this.jettyServer.start(); executor.scheduleWithFixedDelay(()->{ try { service.updateStats(); @@ -60,7 +63,8 @@ public void start(HeightMonitorConfig config) { public void stop() { try { executor.shutdown(); - container.shutdown(); + this.jettyServer.shutdown(); +// container.shutdown(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorService.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorService.java index 66785fd..f4e075f 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorService.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorService.java @@ -18,7 +18,5 @@ public interface HeightMonitorService { void setUp(HeightMonitorConfig config); - boolean addPeer(PeerInfo peerInfo) throws UnknownHostException; - - List getAllPeers(); + HeightMonitorConfig getConfig(); } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorServiceImpl.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorServiceImpl.java index 42b9749..808d146 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorServiceImpl.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/HeightMonitorServiceImpl.java @@ -9,168 +9,72 @@ import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerDiffStat; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerInfo; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerMonitoringResult; -import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeersConfig; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.ShardDTO; -import com.apollocurrency.aplwallet.apl.util.Version; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.dynamic.HttpClientTransportDynamic; -import org.eclipse.jetty.http.HttpMethod; -import org.eclipse.jetty.io.ClientConnector; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.slf4j.Logger; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import jakarta.inject.Singleton; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; - -import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; -import static org.slf4j.LoggerFactory.getLogger; +@Slf4j @Singleton +@NoArgsConstructor public class HeightMonitorServiceImpl implements HeightMonitorService { - - private static final Logger log = getLogger(HeightMonitorServiceImpl.class); - - private static final ObjectMapper objectMapper = new ObjectMapper(); - private static final List DEFAULT_PERIODS = List.of(1, 2, 4, 6, 8, 12, 24, 48, 96); - private static final int CONNECT_TIMEOUT = 5_000; - private static final int IDLE_TIMEOUT = 5_000; - private static final int BLOCKS_TO_RETRIEVE = 1000; - private static final Version DEFAULT_VERSION = new Version("0.0.0"); - private static final String URL_FORMAT = "%s://%s:%d"; - - static { - objectMapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false); - } - - private final AtomicReference lastStats = new AtomicReference<>(); - private HttpClient client; - private List peers; + /** + * Period value is specified in MINUTES !! + */ + private static final List DEFAULT_PERIODS = List.of(0, 1, 2, 4, 8, 10, 20, 40, 60, 80, 120); + private final AtomicReference lastStats = new AtomicReference<>(new NetworkStats(DEFAULT_PERIODS.size())); private List maxBlocksDiffCounters; - private int port; - private List peerApiUrls; - private ExecutorService executor; + private HeightMonitorConfig config; + private FetchHostResultService fetchHostResultService; // service to fetch response from hosts + private boolean skipNotRespondingHost; // skip from processing and showing 'not live' hosts in final result + private int criticalLevel; - public HeightMonitorServiceImpl() { - } - - @PostConstruct public void init() { - client = createHttpClient(); - try { - client.start(); - executor = Executors.newFixedThreadPool(30); - } catch (Exception e) { - throw new RuntimeException(e.toString(), e); - } - } - - - @Override - public List getAllPeers() { - return peers; + log.debug("Init HeightMonitorService..."); + this.fetchHostResultService = new FetchHostResultServiceImpl(this.config); } @Override public void setUp(HeightMonitorConfig config) { - PeersConfig peersConfig = config.getPeersConfig(); - this.port = peersConfig.getDefaultPort(); - this.peers = Collections.synchronizedList(peersConfig.getPeersInfo().stream().peek(this::setDefaultPortIfNull).collect(Collectors.toList())); + this.config = config; this.maxBlocksDiffCounters = createMaxBlocksDiffCounters(config.getMaxBlocksDiffPeriods() == null ? DEFAULT_PERIODS : config.getMaxBlocksDiffPeriods()); - this.peerApiUrls = Collections.synchronizedList(this.peers.stream().map(this::createUrl).collect(Collectors.toList())); - - } - - private PeerInfo setDefaultPortIfNull(PeerInfo peerInfo) { - if (peerInfo.getPort() == null) { - peerInfo.setPort(port); - } - return peerInfo; - } - - @Override - public boolean addPeer(PeerInfo peer) { - Objects.requireNonNull(peer); - setDefaultPortIfNull(peer); - String url = createUrl(peer); - boolean result = false; - if (!peers.contains(peer)) { - result = true; - peers.add(peer); - peerApiUrls.add(url); - log.info("Added new peer: {}", peer.getHost()); - } - return result; - } - - private String createUrl(PeerInfo peerInfo) { - return String.format(URL_FORMAT, peerInfo.getSchema(), peerInfo.getHost(), peerInfo.getPort()); - } - - - @PreDestroy - public void shutdown() { - try { - client.stop(); - executor.shutdownNow(); - } catch (Exception e) { - throw new RuntimeException(e.toString(), e); - } - } - - private HttpClient createHttpClient() { - SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(); - sslContextFactory.setTrustAll(true); - - ClientConnector clientConnector = new ClientConnector(); - clientConnector.setSslContextFactory(sslContextFactory); - - HttpClient httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector)); - httpClient.setIdleTimeout(IDLE_TIMEOUT); - httpClient.setConnectTimeout(CONNECT_TIMEOUT); - httpClient.setFollowRedirects(false); - return httpClient; + this.config.setMaxBlocksDiffPeriods(config.getMaxBlocksDiffPeriods() == null ? DEFAULT_PERIODS : config.getMaxBlocksDiffPeriods()); + this.skipNotRespondingHost = this.config.getPeersConfig().isSkipNotRespondingHost(); + this.criticalLevel = this.getConfig().getPeersConfig().getCriticalLevel(); + init(); } private List createMaxBlocksDiffCounters(List maxBlocksDiffPeriods) { - return maxBlocksDiffPeriods.stream().map(MaxBlocksDiffCounter::new).collect(Collectors.toList()); + return maxBlocksDiffPeriods.stream().map(MaxBlocksDiffCounter::new).toList(); } + @Override public NetworkStats getLastStats() { return lastStats.get(); } + @Override + public HeightMonitorConfig getConfig() { + return this.config; + } + @Override public NetworkStats updateStats() { long start = System.currentTimeMillis(); - log.info("{} : ===========================================", new Date(start)); - Map peerBlocks = getPeersMonitoringResults(); - NetworkStats networkStats = new NetworkStats(); - for (PeerInfo peer : peers) { + log.info("=========================================== : started at {}", new Date(start)); + Map peerBlocks = this.fetchHostResultService.getPeersMonitoringResults(); + NetworkStats networkStats = new NetworkStats(this.maxBlocksDiffCounters.size()); + List allPeers = this.fetchHostResultService.getAllPeers(); + for (PeerInfo peer : allPeers) { PeerMonitoringResult result = peerBlocks.get(peer.getHost()); if (result != null) { - List shardList = result.getShards().stream().map(this::getShardHashFormatted).collect(Collectors.toList()); + List shardList = result.getShards().stream().map(this::getShardHashFormatted).toList(); log.info(String.format("%-16.16s - %8d - %s", peer.getHost(), result.getHeight(), String.join("->", shardList))); networkStats.getPeerHeight().put(peer.getHost(), result.getHeight()); networkStats.getPeerShards().put(peer.getHost(), shardList); @@ -178,33 +82,78 @@ public NetworkStats updateStats() { } log.info(String.format("%7.7s %7.7s %-16.16s %-16.16s %9.9s %7.7s %7.7s %8.8s %8.8s %-13.13s %-13.13s %20.20s", "diff1", "diff2", "peer1", "peer2", "milestone", "height1", "height2", "version1", "version2", "shard1", "shard2", "shard-status")); int currentMaxBlocksDiff = -1; - for (int i = 0; i < peers.size(); i++) { - String host1 = peers.get(i).getHost(); + for (int i = 0; i < allPeers.size(); i++) { + String host1 = allPeers.get(i).getHost(); PeerMonitoringResult targetMonitoringResult = peerBlocks.get(host1); - for (int j = i + 1; j < peers.size(); j++) { - String host2 = peers.get(j).getHost(); + if (this.skipNotRespondingHost && !targetMonitoringResult.isLiveHost()) { + continue; + } + for (int j = i + 1; j < allPeers.size(); j++) { + String host2 = allPeers.get(j).getHost(); PeerMonitoringResult comparedMonitoringResult = peerBlocks.get(host2); - Block lastMutualBlock = targetMonitoringResult.getPeerMutualBlocks().get(peerApiUrls.get(j)); + if (this.skipNotRespondingHost && !comparedMonitoringResult.isLiveHost()) { + continue; + } + Block lastMutualBlock = targetMonitoringResult.getPeerMutualBlocks().get( + this.fetchHostResultService.getPeerApiUrls().get(j)); int lastHeight = targetMonitoringResult.getHeight(); int blocksDiff1 = getBlockDiff(lastMutualBlock, lastHeight); + if ( Math.abs(blocksDiff1) > this.criticalLevel) { + // check and set 'isDownloading' + targetMonitoringResult.setDownloading(true); + } else { + // not all comparisons are suitable + targetMonitoringResult.setDownloading(false); + } int blocksDiff2 = getBlockDiff(lastMutualBlock, comparedMonitoringResult.getHeight()); + if ( Math.abs(blocksDiff2) > this.criticalLevel) { + // check and set 'isDownloading' + comparedMonitoringResult.setDownloading(true); + } else { + // not all comparisons are suitable + comparedMonitoringResult.setDownloading(false); + } int milestoneHeight = getMilestoneHeight(lastMutualBlock); - String shardsStatus = getShardsStatus(targetMonitoringResult, comparedMonitoringResult); + String shardsStatus = getShardsStatus( + targetMonitoringResult, comparedMonitoringResult, blocksDiff1, blocksDiff2, this.criticalLevel); String shard1 = getShardOrNothing(targetMonitoringResult); String shard2 = getShardOrNothing(comparedMonitoringResult); - currentMaxBlocksDiff = Math.max(blocksDiff1, currentMaxBlocksDiff); - log.info(String.format("%7d %7d %-16.16s %-16.16s %9d %7d %7d %8.8s %8.8s %-13.13s %-13.13s %20.20s", blocksDiff1, blocksDiff2, host1, host2, milestoneHeight, lastHeight, comparedMonitoringResult.getHeight(), targetMonitoringResult.getVersion(), comparedMonitoringResult.getVersion(), shard1, shard2, shardsStatus)); - networkStats.getPeerDiffStats().add(new PeerDiffStat(blocksDiff1, blocksDiff2, host1, host2, lastHeight, milestoneHeight, comparedMonitoringResult.getHeight(), targetMonitoringResult.getVersion(), comparedMonitoringResult.getVersion(), shard1, shard2, shardsStatus)); + if (!targetMonitoringResult.isDownloading()) { + // do not count downloading peer + currentMaxBlocksDiff = Math.max(blocksDiff1, currentMaxBlocksDiff); + } + log.info(String.format("%7d %7d %-16.16s %-16.16s %9d %7d %7d %8.8s %8.8s %-13.13s %-13.13s %20.20s", + blocksDiff1, blocksDiff2, host1, host2, milestoneHeight, lastHeight, + comparedMonitoringResult.getHeight(), targetMonitoringResult.getVersion(), + comparedMonitoringResult.getVersion(), shard1, shard2, shardsStatus)); + networkStats.getPeerDiffStats().add(new PeerDiffStat( + blocksDiff1, blocksDiff2, host1, host2, milestoneHeight, lastHeight, + comparedMonitoringResult.getHeight(), targetMonitoringResult.getVersion(), + comparedMonitoringResult.getVersion(), shard1, shard2, shardsStatus)); } } - log.info("========Current max diff {} =========", currentMaxBlocksDiff); + log.info("======== Current max diff {} =========", currentMaxBlocksDiff); networkStats.setCurrentMaxDiff(currentMaxBlocksDiff); - for (MaxBlocksDiffCounter maxBlocksDiffCounter : maxBlocksDiffCounters) { - maxBlocksDiffCounter.update(currentMaxBlocksDiff); - networkStats.getDiffForTime().put(maxBlocksDiffCounter.getPeriod(), maxBlocksDiffCounter.getValue()); + for (int i = 0; i < maxBlocksDiffCounters.size(); i++) { + MaxBlocksDiffCounter maxBlocksDiffCounter = maxBlocksDiffCounters.get(i); + // replace 'currentMaxBlocksDiff' with value from maxBlocksDiffCounter after update() + currentMaxBlocksDiff = maxBlocksDiffCounter.update(i, currentMaxBlocksDiff); + if (currentMaxBlocksDiff >= 0) { + networkStats.getDiffForTime().put(maxBlocksDiffCounter.getPeriod(), currentMaxBlocksDiff); + // check reset condition + if (i == maxBlocksDiffCounters.size() - 1 && networkStats.getDiffForTime().get(maxBlocksDiffCounter.getPeriod()) > 0) { + // reset MAP + networkStats.getDiffForTime().clear(); + // recreate 'maxBlocksDiffCounters' + this.maxBlocksDiffCounters = createMaxBlocksDiffCounters(config.getMaxBlocksDiffPeriods()); + } + } else { + networkStats.getDiffForTime().putIfAbsent(maxBlocksDiffCounter.getPeriod(), currentMaxBlocksDiff); + } } lastStats.set(networkStats); - log.info("{} sec : ===========================================", (System.currentTimeMillis() - start) / 1_000); + log.info("=========================================== : finished in {} sec", (System.currentTimeMillis() - start) / 1_000); + log.trace("DUMP = {}", networkStats.getDiffForTime()); return networkStats; } @@ -219,16 +168,28 @@ private String getShardOrNothing(PeerMonitoringResult targetMonitoringResult) { private String getShardHashFormatted(ShardDTO shardDTO) { String prunableZipHash = shardDTO.getPrunableZipHash(); - return shardDTO.getCoreZipHash().substring(0, 6) + (prunableZipHash != null ? ":" + prunableZipHash.substring(0, 6) : ""); + String coreZipHash = shardDTO.getCoreZipHash(); + if (coreZipHash != null) { + return coreZipHash.substring(0, 6) + (prunableZipHash != null ? ":" + prunableZipHash.substring(0, 6) : ""); + } else { + return "??????"; + } } - private String getShardsStatus(PeerMonitoringResult targetMonitoringResult, PeerMonitoringResult comparedMonitoringResult) { + private String getShardsStatus(PeerMonitoringResult targetMonitoringResult, + PeerMonitoringResult comparedMonitoringResult, + int blocksDiff1, + int blocksDiff2, + int criticalValue) { + if (!targetMonitoringResult.isLiveHost()) { + return "peer1=OFF-LINE?"; + } List targetShards = targetMonitoringResult.getShards(); List comparedShards = comparedMonitoringResult.getShards(); - String status = "OK"; + StringBuilder status = new StringBuilder("OK"); int comparedCounter = comparedShards.size() - 1; int targetCounter = targetShards.size() - 1; - while (targetCounter >= 0 && comparedCounter >= 0 && status.equals("OK")) { + while (targetCounter >= 0 && comparedCounter >= 0 && status.toString().equals("OK")) { ShardDTO comparedShard = comparedShards.get(comparedCounter); ShardDTO targetShard = targetShards.get(targetCounter); if (comparedShard.getShardId() > targetShard.getShardId()) { @@ -240,16 +201,20 @@ private String getShardsStatus(PeerMonitoringResult targetMonitoringResult, Peer continue; } if (targetShard.getShardHeight() != comparedShard.getShardHeight()) { - status = "HEIGHT DIFF FROM " + targetShards.get(targetCounter).getShardId(); + status = new StringBuilder("HEIGHT DIFF FROM " + targetShards.get(targetCounter).getShardId()); } else if (!targetShard.getCoreZipHash().equalsIgnoreCase(comparedShard.getCoreZipHash())) { - status = "CORE DIFF FROM " + targetShards.get(targetCounter).getShardId(); + status = new StringBuilder("CORE DIFF FROM " + targetShards.get(targetCounter).getShardId()); } else if (!Objects.equals(targetShard.getPrunableZipHash(), comparedShard.getPrunableZipHash())) { - status = "PRUN DIFF FROM " + targetShards.get(targetCounter).getShardId(); + status = new StringBuilder("PRUN DIFF FROM " + targetShards.get(targetCounter).getShardId()); } targetCounter--; comparedCounter--; } - return status; + log.trace("Trg = {}", targetMonitoringResult); + if (targetMonitoringResult.isDownloading() || (blocksDiff1 > criticalValue || blocksDiff2 > criticalValue)) { + status.append("/DOWNLOAD"); + } + return status.toString(); } private int getMilestoneHeight(Block lastMutualBlock) { @@ -269,187 +234,5 @@ private int getBlockDiff(Block lastMutualBlock, int lastHeight) { return blocksDiff; } - private Map getPeersMonitoringResults() { - Map peerBlocks = new HashMap<>(); - List> getBlocksRequests = new ArrayList<>(); - for (int i = 0; i < peerApiUrls.size(); i++) { - String peerUrl = peerApiUrls.get(i); - int finalI = i; - getBlocksRequests.add(CompletableFuture.supplyAsync(() -> { - Map blocks = new HashMap<>(); - int height1 = getPeerHeight(peerUrl); - for (int j = finalI + 1; j < peerApiUrls.size(); j++) { - int height2 = getPeerHeight(peerApiUrls.get(j)); - Block lastMutualBlock = findLastMutualBlock(height1, height2, peerUrl, peerApiUrls.get(j)); - blocks.put(peerApiUrls.get(j), lastMutualBlock); - } - Version version = getPeerVersion(peerUrl); - List shards = getShards(peerUrl); - return new PeerMonitoringResult(shards, height1, version, blocks); - }, executor)); - } - for (int i = 0; i < getBlocksRequests.size(); i++) { - String host = peers.get(i).getHost(); - try { - peerBlocks.put(host, getBlocksRequests.get(i).get()); - } catch (Exception e) { - log.error("Error getting blocks for " + host, e); - } - } - return peerBlocks; - } - - private List getShards(String peerUrl) { - List shards = new ArrayList<>(); - String uriToCall = peerUrl + "/rest/shards"; - log.trace("Call to Shards = {}", uriToCall); - Request request = client.newRequest(uriToCall) - .method(HttpMethod.GET); - ContentResponse response; - try { - response = request.send(); - shards = objectMapper.readValue(response.getContentAsString(), new TypeReference>() { - }); - } catch (InterruptedException | TimeoutException | ExecutionException | IOException e) { - log.error("Unable to get Shards or parse response from {} - {}", uriToCall, e.toString()); - } catch (Exception e) { - log.error("Unknown exception:", e); - } - log.trace("getShards result = {}", shards); - return shards; - } - - private int getPeerHeight(String peerUrl) { - String uriToCall = peerUrl + "/apl"; - log.trace("Call to Remote = {}", uriToCall); - JsonNode jsonNode = performRequest(uriToCall, Map.of("requestType", "getBlock")); - int height = -1; - if (jsonNode != null) { - height = jsonNode.get("height").asInt(); - } - log.trace("peerHeight result = {}", height); - return height; - } - - private long getPeerBlockId(String peerUrl, int height) { - String uriToCall = peerUrl + "/apl"; - JsonNode jsonNode = performRequest(uriToCall, Map.of("requestType", "getBlockId", "height", height)); - long blockId = 0; - if (jsonNode != null) { - blockId = Long.parseUnsignedLong(jsonNode.get("block").asText()); - } - log.trace("peerBlockId result = {}", blockId); - return blockId; - } - - private Block getPeerBlock(String peerUrl, int height) { - String uriToCall = peerUrl + "/apl"; - JsonNode node = performRequest(uriToCall, Map.of("requestType", "getBlock", "height", height)); - Block block = null; - if (node != null) { - try { - block = objectMapper.readValue(node.toString(), Block.class); - } catch (JsonProcessingException e) { - log.error("Unable to parse block from {} by peerUrl = '{}' at height = {}", node, peerUrl, height); - } - } - log.trace("peerBlock result = {}", block); - return block; - } - - private JsonNode performRequest(String url, Map params) { - JsonNode result = null; - log.trace("Call to Remote request = {} + {}", url, params); - Request request = client.newRequest(url) - .method(HttpMethod.GET); - params.forEach((name, value) -> request.param(name, value.toString())); - - ContentResponse response; - try { - response = request.send(); - if (response.getStatus() != 200) { - log.info("Bad response: from {}", request.getURI()); - } else { - JsonNode jsonNode = objectMapper.readTree(response.getContentAsString()); - if (jsonNode.has("errorDescription")) { - log.info("Error received from node: {} - {}", request.getURI(), jsonNode.get("errorDescription")); - } else { - result = jsonNode; - log.trace("Call result = {}", result); - } - } - } catch (InterruptedException | TimeoutException | ExecutionException | IOException e) { - log.info("Unable to get or parse response from {} - {}", url, e.toString()); - } catch (Exception e) { - log.info("Unknown exception:", e); - } - return result; - } - - private Version getPeerVersion(String peerUrl) { - Version res = DEFAULT_VERSION; - String uriToCall = peerUrl + "/apl"; - log.trace("Call to peerVersion = {}", uriToCall); - Request request = client.newRequest(uriToCall) - .method(HttpMethod.GET) - .param("requestType", "getBlockchainStatus"); - ContentResponse response = null; - try { - response = request.send(); - JsonNode jsonNode = objectMapper.readTree(response.getContentAsString()); - res = new Version(jsonNode.get("version").asText()); - log.trace("Call result = {}", res); - } catch (InterruptedException | TimeoutException | ExecutionException e) { - log.error("Unable to get peerVersion response from {} - {}", uriToCall, e.toString()); - } catch (IOException e) { - log.error("Unable to parse peerVersion from json for {} - {}", uriToCall, e.toString()); - } - return res; - } - - private Block findLastMutualBlock(int height1, int height2, String host1, String host2) { - int minHeight = Math.min(height1, height2); - int stHeight = minHeight; - int step = 1024; - int firstMatchHeight = -1; - if (height2 == -1 || height1 == -1) { - return null; - } - while (true) { - long peer1BlockId = getPeerBlockId(host1, stHeight); - long peer2BlockId = getPeerBlockId(host2, stHeight); - if (peer1BlockId == peer2BlockId) { - firstMatchHeight = stHeight; - break; - } else { - if (stHeight == 0) { - break; - } - stHeight = Math.max(0, stHeight - step); - step *= 2; - } - } - Block block = null; - if (firstMatchHeight != -1) { - int tHeight = firstMatchHeight; - while (tHeight <= minHeight) { - long peer1BlockId = getPeerBlockId(host1, tHeight); - long peer2BlockId = getPeerBlockId(host2, tHeight); - if (peer1BlockId == peer2BlockId) { - block = getPeerBlock(host1, tHeight); - if (step == 1) { - break; - } - step = Math.max(step / 2, 1); - tHeight += step; - } else { - tHeight -= step; - } - } - } - - return block; - } - } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/MaxBlocksDiffCounter.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/MaxBlocksDiffCounter.java index 62d11dc..bffacd9 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/MaxBlocksDiffCounter.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/MaxBlocksDiffCounter.java @@ -5,28 +5,42 @@ package com.apollocurrency.aplwallet.apl.tools.impl.heightmon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; +import java.time.Duration; +import java.time.LocalDateTime; + +@Slf4j public class MaxBlocksDiffCounter { - private static final Logger log = LoggerFactory.getLogger(MaxBlocksDiffCounter.class); - private int period; + private final int period; private int value; - private long lastResetTime = System.currentTimeMillis() / (1000 * 60); + private LocalDateTime createdDateTime; + private Duration durationOnPeriod; public MaxBlocksDiffCounter(int period) { this.period = period; + this.durationOnPeriod = Duration.ofMinutes(period); + this.createdDateTime = LocalDateTime.now() + .plus(this.durationOnPeriod); + log.debug("Created for date-time : {}", this.createdDateTime); } - public void update(int currentBlockDiff) { - value = Math.max(value, currentBlockDiff); - log.info("MAX Blocks diff for last {}h is {} blocks", period, value); - long currentTime = System.currentTimeMillis() / 1000 / 60; - if (currentTime - lastResetTime >= period * 60) { - lastResetTime = currentTime; - value = currentBlockDiff; + public int update(int index, int currentBlockDiff) { + int result = -1; + LocalDateTime currentTime = LocalDateTime.now(); + log.trace("period = [{}], createdDateTime = '{}', currentTime = '{}' > ? {}", + period, createdDateTime, currentTime, (currentTime.isAfter(this.createdDateTime))); + if (currentTime.isAfter(this.createdDateTime)) { + result = this.value; + if (index == 0) { + this.value = currentBlockDiff; // always assign new value to zero item + } else { + this.value = Math.max(value, currentBlockDiff); // assign max value to the rest of items + } } + log.info("MAX Blocks diff for last {} hours is '{}' blocks {}", period / 60, result, result != -1 ? "*" : ""); + return result; } public int getValue() { @@ -37,7 +51,4 @@ public int getPeriod() { return period; } - public void setPeriod(int period) { - this.period = period; - } } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ForkEnum.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ForkEnum.java new file mode 100644 index 0000000..84cdb90 --- /dev/null +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ForkEnum.java @@ -0,0 +1,21 @@ +package com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model; + +/** + * Status that should be reported by Nagios plugin to Nagios monitoring system. + * Possible values : + * - OK = 0 - service is appeared to be functioning properly, no forks + * - WARNING = 1 - fork is above minimal "warning" threshold value or did not appear to be working properly + * - CRITICAL = 2 - fork is above maximum "critical" threshold value and needs very deep manual fixes + */ +public enum ForkEnum { + OK(0), + WARNING(1), + CRITICAL(2); + + private final int value; + + ForkEnum(int value) { + this.value = value; + } + +} diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ForkStatus.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ForkStatus.java new file mode 100644 index 0000000..0b49f9c --- /dev/null +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ForkStatus.java @@ -0,0 +1,22 @@ +package com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Composed Fork Status with 'max block diff' value(s) included + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ForkStatus { + /** + * Common fork status + */ + ForkEnum status = ForkEnum.OK; + /** + * Maximum fork difference found at latest time + */ + int maxDiff = -1; +} diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/NetworkStats.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/NetworkStats.java index 60d6901..4d8356b 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/NetworkStats.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/NetworkStats.java @@ -26,8 +26,8 @@ public NetworkStats(List peerDiffStats, int currentMaxDiff, Map(), -1, new LinkedHashMap<>(), new HashMap<>(), new HashMap<>()); + public NetworkStats(int size) { + this(new ArrayList<>(), -1, new LinkedHashMap<>(size), new HashMap<>(), new HashMap<>()); } public Map> getPeerShards() { diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeerMonitoringResult.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeerMonitoringResult.java index bd2a632..693082c 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeerMonitoringResult.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeerMonitoringResult.java @@ -11,16 +11,35 @@ import java.util.Map; public class PeerMonitoringResult { + private String peerUrl; private List shards; private int height; private Version version; private Map peerMutualBlocks; + private boolean isLiveHost = false; + private boolean isDownloading = false; - public PeerMonitoringResult(List shards, int height, Version version, Map peerMutualBlocks) { + public PeerMonitoringResult(String peerUrl, + List shards, + int height, // main host + Version version, + Map peerMutualBlocks, + boolean isLiveHost, + int height2, // second comparing host + int criticalHeightValue) { + this.peerUrl = peerUrl; this.shards = shards; this.height = height; this.version = version; this.peerMutualBlocks = peerMutualBlocks; + this.isLiveHost = isLiveHost; + if (isLiveHost && this.peerMutualBlocks != null + && this.peerMutualBlocks.values().stream().findFirst().isPresent() + && height2 > 0) { + // we evaluate peer downloading BH if it has difference between 'mutual block height' bigger then 'critical' config value + this.isDownloading = + Math.abs(this.peerMutualBlocks.values().stream().findFirst().get().getHeight() - height2) > criticalHeightValue; + } } public Map getPeerMutualBlocks() { @@ -54,4 +73,30 @@ public List getShards() { public void setShards(List shards) { this.shards = shards; } + + public boolean isLiveHost() { + return isLiveHost; + } + + public boolean isDownloading() { + return isDownloading; + } + + public void setDownloading(boolean downloading) { + isDownloading = downloading; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("PeerMonitoringResult{"); + sb.append("peerUrl='").append(peerUrl).append('\''); + sb.append(", isLiveHost=").append(isLiveHost); + sb.append(", this.isDownloading=").append(isDownloading); + sb.append(", height=").append(height); + sb.append(", version=").append(version); + sb.append(", shards=").append(shards); + sb.append(", peerMutualBlocks=").append(peerMutualBlocks); + sb.append('}'); + return sb.toString(); + } } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeersConfig.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeersConfig.java index a772797..5c45244 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeersConfig.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/PeersConfig.java @@ -11,43 +11,49 @@ import java.util.List; public class PeersConfig { - private static final int DEFAULT_PORT = 7876; - private List peersInfo; - private int defaultPort; - private int warningLevel = 720; //number of blocks that forms fork - private int criticalLevel = 21000; // apl.maxRollback Apollo constant + private final List peersInfo; + private final int defaultPort; + private final int warningLevel; //number of blocks that considered as a fork + private final int criticalLevel; // apl.maxRollback Apollo constant + private final boolean skipNotRespondingHost; // skip not responding host from processing + private final int jettyServerPort; @JsonCreator public PeersConfig(@JsonProperty("peersInfo") List peersInfo, @JsonProperty("defaultPort") int defaultPort, @JsonProperty("warningLevel") int warningLevel, - @JsonProperty("criticalLevel") int criticalLevel) { + @JsonProperty("criticalLevel") int criticalLevel, + @JsonProperty("skipNotRespondingHost") boolean skipNotRespondingHost, + @JsonProperty("jettyServerPort") int jettyServerPort) { this.peersInfo = peersInfo; this.defaultPort = defaultPort; this.warningLevel = warningLevel; this.criticalLevel = criticalLevel; - } - - public PeersConfig(@JsonProperty("peersInfo") List peersInfo, - @JsonProperty("warningLevel") int warningLevel, - @JsonProperty("criticalLevel") int criticalLevel - ) { - this(peersInfo, DEFAULT_PORT, warningLevel, criticalLevel); + this.skipNotRespondingHost = skipNotRespondingHost; + this.jettyServerPort = jettyServerPort; } public List getPeersInfo() { return peersInfo; } - public void setPeersInfo(List peersInfo) { - this.peersInfo = peersInfo; - } - public int getDefaultPort() { return defaultPort; } - public void setDefaultPort(int defaultPort) { - this.defaultPort = defaultPort; + public int getWarningLevel() { + return warningLevel; + } + + public int getCriticalLevel() { + return criticalLevel; + } + + public boolean isSkipNotRespondingHost() { + return skipNotRespondingHost; + } + + public int getJettyServerPort() { + return jettyServerPort; } } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ShardDTO.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ShardDTO.java index ae0f15b..c307762 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ShardDTO.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/model/ShardDTO.java @@ -82,4 +82,14 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(shardId, shardHash, coreZipHash, prunableZipHash, shardHeight, shardState); } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ShardDTO{"); + sb.append("shardId=").append(shardId); + sb.append(", shardHash='").append(shardHash != null && shardHash.length() > 6 ? shardHash.substring(0, 6) : "error").append('\''); + sb.append(", shardHeight=").append(shardHeight); + sb.append('}'); + return sb.toString(); + } } diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/JettyServer.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/JettyServer.java index 7df0faa..908a9db 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/JettyServer.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/JettyServer.java @@ -4,6 +4,8 @@ package com.apollocurrency.aplwallet.apl.tools.impl.heightmon.web; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.HeightMonitorService; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; @@ -15,17 +17,19 @@ import org.jboss.weld.environment.servlet.Listener; import org.jboss.weld.module.web.servlet.WeldInitialListener; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import jakarta.inject.Singleton; +import java.util.Objects; @Slf4j -@Singleton +@NoArgsConstructor public class JettyServer { private Server server; public static final String rootPathSpec = "/rest/*"; - public JettyServer() { + private HeightMonitorService heightMonitorService; + + public JettyServer(HeightMonitorService heightMonitorService) { + Objects.requireNonNull(heightMonitorService, "heightMonitorService is NULL"); + this.heightMonitorService = heightMonitorService; server = new Server(); HttpConfiguration configuration = new HttpConfiguration(); configuration.setSendDateHeader(false); @@ -33,7 +37,7 @@ public JettyServer() { HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(configuration); ServerConnector connector = new ServerConnector(server, httpConnectionFactory); - connector.setPort(7872); + connector.setPort(this.heightMonitorService.getConfig().getPeersConfig().getJettyServerPort()); connector.setHost("0.0.0.0"); connector.setReuseAddress(true); server.addConnector(connector); @@ -57,7 +61,6 @@ public JettyServer() { server.setHandler(servletHandler); } - @PostConstruct public void start() { try { server.start(); @@ -66,7 +69,6 @@ public void start() { } } - @PreDestroy public void shutdown() { try { server.stop(); diff --git a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatController.java b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatController.java index 087aa0f..31bc056 100644 --- a/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatController.java +++ b/src/main/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatController.java @@ -4,11 +4,20 @@ package com.apollocurrency.aplwallet.apl.tools.impl.heightmon.web; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.FetchHostResultService; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.HeightMonitorService; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.ForkEnum; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.ForkStatus; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.HeightMonitorConfig; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.NetworkStats; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerDiffStat; import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeerInfo; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.media.Content; +import io.swagger.v3.oas.annotations.media.Schema; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import jakarta.annotation.security.PermitAll; import jakarta.inject.Inject; import jakarta.inject.Singleton; import jakarta.validation.constraints.NotNull; @@ -20,16 +29,30 @@ import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import java.net.UnknownHostException; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; +import java.util.Optional; +@Slf4j @Path("/netstat") @Singleton +@NoArgsConstructor public class NetStatController { - @Inject private HeightMonitorService heightMonitorService; + private FetchHostResultService fetchHostResultService; + + + @Inject + public NetStatController(HeightMonitorService heightMonitorService, + FetchHostResultService fetchHostResultService) { + this.heightMonitorService = heightMonitorService; + this.fetchHostResultService = fetchHostResultService; + } @GET @Produces(MediaType.APPLICATION_JSON) @@ -46,7 +69,7 @@ public Response getStats() { @Path("/peers") @Produces(MediaType.APPLICATION_JSON) public Response getAllPeers() { - return Response.ok(heightMonitorService.getAllPeers()).build(); + return Response.ok(fetchHostResultService.getAllPeers()).build(); } @POST @@ -61,7 +84,7 @@ public Response addPeer(@NotNull @QueryParam("ip") String ip, @QueryParam("port" if (port != null) { peerInfo.setPort(port); } - return Response.ok(heightMonitorService.addPeer(peerInfo)).build(); + return Response.ok(fetchHostResultService.addPeer(peerInfo)).build(); } catch (UnknownHostException e) { return Response.status(422, e.getLocalizedMessage()).build(); } @@ -76,11 +99,61 @@ public Response getPeerStats(@NotNull @PathParam("ip") String ip) { .stream() .filter(peerDiffStat -> peerDiffStat.getPeer1().equalsIgnoreCase(ip) || peerDiffStat.getPeer2().equalsIgnoreCase(ip)) - .collect(Collectors.toList()); + .toList(); if (diffStats.isEmpty()) { return Response.status(422, "No monitoring data found for peer " + ip).build(); } else { return Response.ok(diffStats).build(); } } + + @GET + @Path("/fork") + @Produces(MediaType.APPLICATION_JSON) + @Operation( + summary = "Returns fork status", + description = "Returns fork status. Possible values are: OK = 0, Warning = 1, Critical = 2", + responses = { + @ApiResponse(responseCode = "200", description = "Successful execution", + content = @Content(mediaType = "application/json", + schema = @Schema(implementation = ForkStatus.class))) + }) + @PermitAll + public Response getForkStatus() { + log.debug("Getting getForkStatus..."); + HeightMonitorConfig config = this.heightMonitorService.getConfig(); + List maxBlocksDiffPeriods = config.getMaxBlocksDiffPeriods(); + log.debug("maxBlocksDiffPeriods = [{}] / empty ? = {}", maxBlocksDiffPeriods.size(), maxBlocksDiffPeriods.isEmpty()); + ForkStatus forkStatus = new ForkStatus(); + if (maxBlocksDiffPeriods.isEmpty()) { + return Response.ok(forkStatus).build(); + } + int minTimeValue = maxBlocksDiffPeriods.get(0); + log.debug("minTimeValue = {}", minTimeValue); + Optional> maxDiffAtLatestHour = heightMonitorService.getLastStats().getDiffForTime().entrySet().stream().filter(entry -> entry.getKey() == minTimeValue).findFirst(); + if (maxDiffAtLatestHour.isEmpty()) { + log.debug("maxDiffAtLatestHour is empty !"); + forkStatus = new ForkStatus(); + return Response.ok(forkStatus).build(); + } + + int criticalLevel = config.getPeersConfig().getCriticalLevel(); + int warningLevel = config.getPeersConfig().getWarningLevel(); + Integer maxDiffMapValue = maxDiffAtLatestHour.get().getValue(); + if (maxDiffMapValue < criticalLevel + && maxDiffMapValue < warningLevel) { + log.debug("OK level, max diff = {}", maxDiffMapValue); + forkStatus = new ForkStatus(ForkEnum.OK, maxDiffMapValue); + + } else if (maxDiffMapValue < criticalLevel + && maxDiffMapValue >= warningLevel) { + log.debug("WARNING level, max diff = {}", maxDiffMapValue); + forkStatus = new ForkStatus(ForkEnum.WARNING, maxDiffMapValue); + + } else if (maxDiffMapValue >= criticalLevel) { + log.debug("CRITICAL level, max diff = {}", maxDiffMapValue); + forkStatus = new ForkStatus(ForkEnum.CRITICAL, maxDiffMapValue); + } + return Response.ok(forkStatus).build(); + } } diff --git a/src/test/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatControllerTest.java b/src/test/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatControllerTest.java new file mode 100644 index 0000000..a584eac --- /dev/null +++ b/src/test/java/com/apollocurrency/aplwallet/apl/tools/impl/heightmon/web/NetStatControllerTest.java @@ -0,0 +1,186 @@ +package com.apollocurrency.aplwallet.apl.tools.impl.heightmon.web; + +import com.apollocurrency.aplwallet.api.dto.ShardDTO; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.FetchHostResultService; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.HeightMonitorService; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.ForkEnum; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.ForkStatus; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.HeightMonitorConfig; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.NetworkStats; +import com.apollocurrency.aplwallet.apl.tools.impl.heightmon.model.PeersConfig; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import lombok.SneakyThrows; +import org.jboss.resteasy.mock.MockDispatcherFactory; +import org.jboss.resteasy.mock.MockHttpRequest; +import org.jboss.resteasy.mock.MockHttpResponse; +import org.jboss.resteasy.spi.Dispatcher; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class NetStatControllerTest { + + private static ObjectMapper mapper = new ObjectMapper(); + private Dispatcher dispatcher; + private NetStatController controller; + @Mock + private HeightMonitorService heightMonitorService; + @Mock + private FetchHostResultService fetchHostResultService; + @Mock + private HeightMonitorConfig config; + + @BeforeEach + void setUp() { + this.dispatcher = MockDispatcherFactory.createDispatcher(); + this.controller = new NetStatController(heightMonitorService, fetchHostResultService); + dispatcher.getRegistry().addSingletonResource(controller); + } + + @SneakyThrows + @Test + void getForkStatus_maxBlocksDiffPeriods_Empty() { + when(config.getMaxBlocksDiffPeriods()).thenReturn(List.of()); + when(heightMonitorService.getConfig()).thenReturn(config); + + MockHttpRequest request = MockHttpRequest.get("/netstat/fork").contentType(MediaType.APPLICATION_JSON_TYPE); + MockHttpResponse response = new MockHttpResponse(); + + dispatcher.invoke(request, response); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + String forkJson = response.getContentAsString(); + ForkStatus forkStatus = mapper.readValue(forkJson, new TypeReference<>() {}); + assertEquals(ForkEnum.OK, forkStatus.getStatus()); + assertEquals(-1, forkStatus.getMaxDiff()); + + verify(heightMonitorService, never()).getLastStats(); + } + + @SneakyThrows + @Test + void getForkStatus_stats_Empty() { + when(config.getMaxBlocksDiffPeriods()).thenReturn(List.of(0, 1, 2)); + when(heightMonitorService.getConfig()).thenReturn(config); + NetworkStats stats = mock(NetworkStats.class); + when(stats.getDiffForTime()).thenReturn(Map.of()); + when(heightMonitorService.getLastStats()).thenReturn(stats); + + MockHttpRequest request = MockHttpRequest.get("/netstat/fork").contentType(MediaType.APPLICATION_JSON_TYPE); + MockHttpResponse response = new MockHttpResponse(); + + dispatcher.invoke(request, response); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + String forkJson = response.getContentAsString(); + ForkStatus forkStatus = mapper.readValue(forkJson, new TypeReference<>() {}); + assertEquals(ForkEnum.OK, forkStatus.getStatus()); + assertEquals(-1, forkStatus.getMaxDiff()); + + verify(heightMonitorService, timeout(1)).getLastStats(); + } + + @SneakyThrows + @Test + void getForkStatus_OK_level() { + when(config.getMaxBlocksDiffPeriods()).thenReturn(List.of(0, 1, 2)); + when(heightMonitorService.getConfig()).thenReturn(config); + PeersConfig peersConfig = mock(PeersConfig.class); + when(peersConfig.getWarningLevel()).thenReturn(720); + when(peersConfig.getCriticalLevel()).thenReturn(2000); + when(config.getPeersConfig()).thenReturn(peersConfig); + when(heightMonitorService.getConfig()).thenReturn(config); + NetworkStats stats = mock(NetworkStats.class); + int maxDiffValue = 719; + when(stats.getDiffForTime()).thenReturn(Map.of(0, maxDiffValue)); + when(heightMonitorService.getLastStats()).thenReturn(stats); + + MockHttpRequest request = MockHttpRequest.get("/netstat/fork").contentType(MediaType.APPLICATION_JSON_TYPE); + MockHttpResponse response = new MockHttpResponse(); + + dispatcher.invoke(request, response); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + String forkJson = response.getContentAsString(); + ForkStatus forkStatus = mapper.readValue(forkJson, new TypeReference<>() {}); + assertEquals(ForkEnum.OK, forkStatus.getStatus()); + assertEquals(maxDiffValue, forkStatus.getMaxDiff()); + + verify(heightMonitorService, timeout(1)).getLastStats(); + } + + @SneakyThrows + @Test + void getForkStatus_WARN_level() { + when(config.getMaxBlocksDiffPeriods()).thenReturn(List.of(0, 1, 2)); + when(heightMonitorService.getConfig()).thenReturn(config); + PeersConfig peersConfig = mock(PeersConfig.class); + when(peersConfig.getWarningLevel()).thenReturn(720); + when(peersConfig.getCriticalLevel()).thenReturn(2000); + when(config.getPeersConfig()).thenReturn(peersConfig); + when(heightMonitorService.getConfig()).thenReturn(config); + NetworkStats stats = mock(NetworkStats.class); + int maxDiffValue = 720; + when(stats.getDiffForTime()).thenReturn(Map.of(0, maxDiffValue)); + when(heightMonitorService.getLastStats()).thenReturn(stats); + + MockHttpRequest request = MockHttpRequest.get("/netstat/fork").contentType(MediaType.APPLICATION_JSON_TYPE); + MockHttpResponse response = new MockHttpResponse(); + + dispatcher.invoke(request, response); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + String forkJson = response.getContentAsString(); + ForkStatus forkStatus = mapper.readValue(forkJson, new TypeReference<>() {}); + assertEquals(ForkEnum.WARNING, forkStatus.getStatus()); + assertEquals(maxDiffValue, forkStatus.getMaxDiff()); + + verify(heightMonitorService, timeout(1)).getLastStats(); + } + + @SneakyThrows + @Test + void getForkStatus_CRITICAL_level() { + when(config.getMaxBlocksDiffPeriods()).thenReturn(List.of(0, 1, 2)); + when(heightMonitorService.getConfig()).thenReturn(config); + PeersConfig peersConfig = mock(PeersConfig.class); + when(peersConfig.getWarningLevel()).thenReturn(720); + when(peersConfig.getCriticalLevel()).thenReturn(2000); + when(config.getPeersConfig()).thenReturn(peersConfig); + when(heightMonitorService.getConfig()).thenReturn(config); + NetworkStats stats = mock(NetworkStats.class); + int maxDiffValue = 2_000; + when(stats.getDiffForTime()).thenReturn(Map.of(0, maxDiffValue)); + when(heightMonitorService.getLastStats()).thenReturn(stats); + + MockHttpRequest request = MockHttpRequest.get("/netstat/fork").contentType(MediaType.APPLICATION_JSON_TYPE); + MockHttpResponse response = new MockHttpResponse(); + + dispatcher.invoke(request, response); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + String forkJson = response.getContentAsString(); + ForkStatus forkStatus = mapper.readValue(forkJson, new TypeReference<>() {}); + assertEquals(ForkEnum.CRITICAL, forkStatus.getStatus()); + assertEquals(maxDiffValue, forkStatus.getMaxDiff()); + + verify(heightMonitorService, timeout(1)).getLastStats(); + } +} \ No newline at end of file