diff --git a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java
index 9da585e14..08c679b4b 100644
--- a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java
+++ b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java
@@ -26,9 +26,10 @@
import org.slf4j.LoggerFactory;
import javax.servlet.ServletException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import static org.apache.unomi.healthcheck.HealthCheckConfig.CONFIG_AUTH_REALM;
@@ -42,8 +43,11 @@ public class HealthCheckService {
private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckService.class.getName());
private final List providers = new ArrayList<>();
- private ExecutorService executor;
- private boolean busy = false;
+ private final Object cacheLock = new Object();
+ private volatile long cacheTimestamp = 0L;
+ private volatile List healthCache = Collections.emptyList();
+ private volatile boolean initialized = false;
+ private volatile boolean busy = false;
private boolean registered = false;
@Reference
@@ -58,7 +62,6 @@ public HealthCheckService() {
@Activate
public void activate() throws ServletException, NamespaceException {
LOGGER.info("Activating healthcheck service...");
- executor = Executors.newSingleThreadExecutor();
if (!registered) {
setConfig(config);
}
@@ -98,9 +101,6 @@ public void deactivate() {
httpService.unregister("/health/check");
registered = false;
}
- if (executor != null) {
- executor.shutdown();
- }
}
@Reference(service = HealthCheckProvider.class, cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC, unbind = "unbind")
@@ -114,37 +114,57 @@ protected void unbind(HealthCheckProvider provider) {
providers.remove(provider);
}
- public List check() throws RejectedExecutionException {
- if (config !=null && config.isEnabled()) {
- LOGGER.debug("Health check called");
- if (busy) {
- throw new RejectedExecutionException("Health check already in progress");
- } else {
- try {
+ public List check() {
+ if (config == null || !config.isEnabled()) {
+ LOGGER.warn("Healthcheck service is disabled");
+ return Collections.emptyList();
+ }
+ if (!initialized) {
+ synchronized (cacheLock) {
+ if (!initialized) {
+ refreshCache();
+ initialized = true;
+ }
+ }
+ } else if (shouldRefreshCache()) {
+ synchronized (cacheLock) {
+ if (!busy) {
busy = true;
- List health = new ArrayList<>();
- health.add(HealthCheckResponse.live("karaf"));
- for (HealthCheckProvider provider : providers.stream().filter(p -> config.getEnabledProviders().contains(p.name())).collect(Collectors.toList())) {
- Future future = executor.submit(provider::execute);
- try {
- HealthCheckResponse response = future.get(config.getTimeout(), TimeUnit.MILLISECONDS);
- health.add(response);
- } catch (TimeoutException e) {
- future.cancel(true);
- health.add(provider.timeout());
- } catch (Exception e) {
- LOGGER.error("Error while executing health check", e);
- }
+ try {
+ refreshCache();
+ } finally {
+ busy = false;
}
- return health;
- } finally {
- busy = false;
}
}
- } else {
- LOGGER.info("Healthcheck service is disabled");
- return Collections.emptyList();
}
+ return healthCache;
+ }
+
+ private boolean shouldRefreshCache() {
+ return !busy && (System.currentTimeMillis() - cacheTimestamp) > 1000;
}
+ private void refreshCache() {
+ try {
+ List health = new ArrayList<>();
+ health.add(HealthCheckResponse.live("karaf"));
+ for (HealthCheckProvider provider : providers.stream()
+ .filter(p -> config.getEnabledProviders().contains(p.name()))
+ .toList()) {
+ try {
+ HealthCheckResponse response = provider.execute();
+ health.add(response);
+ } catch (Exception e) {
+ LOGGER.error("Error while executing health check", e);
+ health.add(provider.timeout());
+ }
+ }
+ health.sort(Comparator.comparing(HealthCheckResponse::getName));
+ healthCache = List.copyOf(health);
+ cacheTimestamp = System.currentTimeMillis();
+ } catch (Exception e) {
+ LOGGER.error("Error refreshing health cache", e);
+ }
+ }
}
diff --git a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java
index 27dec31d4..b25deb56c 100644
--- a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java
+++ b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java
@@ -32,7 +32,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
-import java.util.Comparator;
import java.util.List;
/**
@@ -66,7 +65,6 @@ protected void service(HttpServletRequest request, HttpServletResponse response)
return;
}
List checks = service.check();
- checks.sort(Comparator.comparing(HealthCheckResponse::getName));
response.getWriter().println(mapper.writeValueAsString(checks));
response.setContentType("application/json");
response.setHeader("Cache-Control", "no-cache");
diff --git a/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java b/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java
index 6acde9fe2..7d01aaf57 100644
--- a/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java
@@ -22,6 +22,7 @@
import org.apache.unomi.api.Profile;
import org.apache.unomi.api.PropertyType;
import org.apache.unomi.api.rules.Rule;
+import org.apache.unomi.api.services.EventService;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.junit.After;
import org.junit.Assert;
@@ -166,10 +167,13 @@ private Event sendCopyPropertyEvent(Map properties, String profi
Event event = new Event("copyProperties", null, profile, null, null, profile, new Date());
event.setPersistent(false);
-
event.setProperty("urlParameters", properties);
-
- eventService.send(event);
+ int result = eventService.send(event);
+ LOGGER.info("Event processing result: {}", result);
+ if (result == EventService.ERROR) {
+ LOGGER.error("Event processing resulted in ERROR. Event details: {}", event);
+ }
+ Assert.assertNotEquals(EventService.ERROR, result);
return event;
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java b/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java
index 755d184c2..882259525 100644
--- a/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java
@@ -33,8 +33,10 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.*;
import static org.junit.Assert.fail;
@@ -68,6 +70,42 @@ public void testHealthCheck() {
}
}
+ @Test
+ public void testConcurrentHealthCheck() {
+ final int NB_THREADS = 10;
+ final int NB_ITERATIONS = 20;
+
+ ExecutorService executorService = null;
+ try {
+ executorService = Executors.newFixedThreadPool(NB_THREADS);
+ List>> futures = new ArrayList<>();
+ for (int i = 0; i < NB_ITERATIONS; i++) {
+ for (int j = 0; j < NB_THREADS; j++) {
+ Future> future = executorService.submit(() -> get(HEALTHCHECK_ENDPOINT, new TypeReference<>() {}));
+ futures.add(future);
+ }
+ for (Future> future : futures) {
+ List health = future.get(10, TimeUnit.SECONDS);
+ Assert.assertEquals(4, health.size());
+ Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("karaf") && r.getStatus() == HealthCheckResponse.Status.LIVE));
+ Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals(searchEngine) && r.getStatus() == HealthCheckResponse.Status.LIVE));
+ Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("unomi") && r.getStatus() == HealthCheckResponse.Status.LIVE));
+ Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("cluster") && r.getStatus() == HealthCheckResponse.Status.LIVE));
+ }
+ Thread.sleep(10);
+ }
+ executorService.shutdown();
+ Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS));
+ } catch (Exception e) {
+ LOGGER.error("Error while executing concurrent health check", e);
+ fail("Error while executing concurrent health check: " + e.getMessage());
+ } finally {
+ if ( executorService != null ) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
protected T get(final String url, TypeReference typeReference) {
CloseableHttpResponse response = null;
try {
diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
index be82d3109..72576a847 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java
@@ -20,6 +20,7 @@
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
+import org.apache.unomi.api.services.EventService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -27,6 +28,8 @@
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerSuite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.*;
@@ -36,6 +39,8 @@
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerSuite.class)
public class ProfileMergeIT extends BaseIT {
+ private final static Logger LOGGER = LoggerFactory.getLogger(ProfileMergeIT.class);
+
public static final String PERSONALIZATION_STRATEGY_STATUS = "personalizationStrategyStatus";
public static final String PERSONALIZATION_STRATEGY_STATUS_ID = "personalizationId";
public static final String PERSONALIZATION_STRATEGY_STATUS_IN_CTRL_GROUP = "inControlGroup";
@@ -468,7 +473,11 @@ private Event sendEvent() {
profile.setProperty("j:nodename", "michel");
profile.getSystemProperties().put("mergeIdentifier", "jose");
Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date());
- eventService.send(testEvent);
+ int result = eventService.send(testEvent);
+ LOGGER.info("Event processing result: {}", result);
+ if (result == EventService.ERROR) {
+ LOGGER.error("Event processing resulted in ERROR. Event details: {}", testEvent);
+ }
return testEvent;
}
diff --git a/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java b/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java
index 821158f19..3a2086f1b 100644
--- a/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java
@@ -23,6 +23,7 @@
import org.apache.unomi.api.actions.Action;
import org.apache.unomi.api.conditions.Condition;
import org.apache.unomi.api.rules.Rule;
+import org.apache.unomi.api.services.EventService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -30,6 +31,8 @@
import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerSuite;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Date;
@@ -38,6 +41,7 @@
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerSuite.class)
public class SendEventActionIT extends BaseIT {
+ private final static Logger LOGGER = LoggerFactory.getLogger(SendEventActionIT.class);
private final static String TEST_RULE_ID = "sendEventTest";
private final static String EVENT_ID = "sendEventTestId";
@@ -80,7 +84,11 @@ private Event sendEvent() {
profile.setProperty("j:nodename", "michel");
Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date());
testEvent.setItemId(EVENT_ID);
- eventService.send(testEvent);
+ int result = eventService.send(testEvent);
+ LOGGER.info("Event processing result: {}", result);
+ if (result == EventService.ERROR) {
+ LOGGER.error("Event processing resulted in ERROR. Event details: {}", testEvent);
+ }
return testEvent;
}
diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java
index 87a89c1b1..a5881c37e 100644
--- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java
+++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java
@@ -50,7 +50,7 @@
* to dynamically adjust its behavior based on external configurations. It leverages the {@link FeaturesService} to
* manage Karaf features dynamically.
*
- * Configuration
+ * Configuration
* The service reads its configuration from the OSGi Configuration Admin under the PID org.apache.unomi.start.
* The configuration includes:
*
@@ -58,14 +58,14 @@
* in the format persistenceImplementation:feature1,feature2.
*
*
- * Usage
+ * Usage
* This service can be controlled programmatically through its methods:
*
* - {@link #startUnomi(String, boolean)}: Installs and starts features for the specified start features configuration.
* - {@link #stopUnomi()}: Stops and uninstalls the previously started features.
*
*
- * Dependencies
+ * Dependencies
* The following dependencies are required for this service:
*
* - {@link MigrationService}: Handles migration tasks during startup.