diff --git a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java index 9da585e14..08c679b4b 100644 --- a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java +++ b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/HealthCheckService.java @@ -26,9 +26,10 @@ import org.slf4j.LoggerFactory; import javax.servlet.ServletException; -import java.util.*; -import java.util.concurrent.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import static org.apache.unomi.healthcheck.HealthCheckConfig.CONFIG_AUTH_REALM; @@ -42,8 +43,11 @@ public class HealthCheckService { private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckService.class.getName()); private final List providers = new ArrayList<>(); - private ExecutorService executor; - private boolean busy = false; + private final Object cacheLock = new Object(); + private volatile long cacheTimestamp = 0L; + private volatile List healthCache = Collections.emptyList(); + private volatile boolean initialized = false; + private volatile boolean busy = false; private boolean registered = false; @Reference @@ -58,7 +62,6 @@ public HealthCheckService() { @Activate public void activate() throws ServletException, NamespaceException { LOGGER.info("Activating healthcheck service..."); - executor = Executors.newSingleThreadExecutor(); if (!registered) { setConfig(config); } @@ -98,9 +101,6 @@ public void deactivate() { httpService.unregister("/health/check"); registered = false; } - if (executor != null) { - executor.shutdown(); - } } @Reference(service = HealthCheckProvider.class, cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC, unbind = "unbind") @@ -114,37 +114,57 @@ protected void unbind(HealthCheckProvider provider) { providers.remove(provider); } - public List check() throws RejectedExecutionException { - if (config !=null && config.isEnabled()) { - LOGGER.debug("Health check called"); - if (busy) { - throw new RejectedExecutionException("Health check already in progress"); - } else { - try { + public List check() { + if (config == null || !config.isEnabled()) { + LOGGER.warn("Healthcheck service is disabled"); + return Collections.emptyList(); + } + if (!initialized) { + synchronized (cacheLock) { + if (!initialized) { + refreshCache(); + initialized = true; + } + } + } else if (shouldRefreshCache()) { + synchronized (cacheLock) { + if (!busy) { busy = true; - List health = new ArrayList<>(); - health.add(HealthCheckResponse.live("karaf")); - for (HealthCheckProvider provider : providers.stream().filter(p -> config.getEnabledProviders().contains(p.name())).collect(Collectors.toList())) { - Future future = executor.submit(provider::execute); - try { - HealthCheckResponse response = future.get(config.getTimeout(), TimeUnit.MILLISECONDS); - health.add(response); - } catch (TimeoutException e) { - future.cancel(true); - health.add(provider.timeout()); - } catch (Exception e) { - LOGGER.error("Error while executing health check", e); - } + try { + refreshCache(); + } finally { + busy = false; } - return health; - } finally { - busy = false; } } - } else { - LOGGER.info("Healthcheck service is disabled"); - return Collections.emptyList(); } + return healthCache; + } + + private boolean shouldRefreshCache() { + return !busy && (System.currentTimeMillis() - cacheTimestamp) > 1000; } + private void refreshCache() { + try { + List health = new ArrayList<>(); + health.add(HealthCheckResponse.live("karaf")); + for (HealthCheckProvider provider : providers.stream() + .filter(p -> config.getEnabledProviders().contains(p.name())) + .toList()) { + try { + HealthCheckResponse response = provider.execute(); + health.add(response); + } catch (Exception e) { + LOGGER.error("Error while executing health check", e); + health.add(provider.timeout()); + } + } + health.sort(Comparator.comparing(HealthCheckResponse::getName)); + healthCache = List.copyOf(health); + cacheTimestamp = System.currentTimeMillis(); + } catch (Exception e) { + LOGGER.error("Error refreshing health cache", e); + } + } } diff --git a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java index 27dec31d4..b25deb56c 100644 --- a/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java +++ b/extensions/healthcheck/src/main/java/org/apache/unomi/healthcheck/servlet/HealthCheckServlet.java @@ -32,7 +32,6 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.util.Comparator; import java.util.List; /** @@ -66,7 +65,6 @@ protected void service(HttpServletRequest request, HttpServletResponse response) return; } List checks = service.check(); - checks.sort(Comparator.comparing(HealthCheckResponse::getName)); response.getWriter().println(mapper.writeValueAsString(checks)); response.setContentType("application/json"); response.setHeader("Cache-Control", "no-cache"); diff --git a/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java b/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java index 6acde9fe2..7d01aaf57 100644 --- a/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/CopyPropertiesActionIT.java @@ -22,6 +22,7 @@ import org.apache.unomi.api.Profile; import org.apache.unomi.api.PropertyType; import org.apache.unomi.api.rules.Rule; +import org.apache.unomi.api.services.EventService; import org.apache.unomi.persistence.spi.CustomObjectMapper; import org.junit.After; import org.junit.Assert; @@ -166,10 +167,13 @@ private Event sendCopyPropertyEvent(Map properties, String profi Event event = new Event("copyProperties", null, profile, null, null, profile, new Date()); event.setPersistent(false); - event.setProperty("urlParameters", properties); - - eventService.send(event); + int result = eventService.send(event); + LOGGER.info("Event processing result: {}", result); + if (result == EventService.ERROR) { + LOGGER.error("Event processing resulted in ERROR. Event details: {}", event); + } + Assert.assertNotEquals(EventService.ERROR, result); return event; } diff --git a/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java b/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java index 755d184c2..882259525 100644 --- a/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/HealthCheckIT.java @@ -33,8 +33,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.*; import static org.junit.Assert.fail; @@ -68,6 +70,42 @@ public void testHealthCheck() { } } + @Test + public void testConcurrentHealthCheck() { + final int NB_THREADS = 10; + final int NB_ITERATIONS = 20; + + ExecutorService executorService = null; + try { + executorService = Executors.newFixedThreadPool(NB_THREADS); + List>> futures = new ArrayList<>(); + for (int i = 0; i < NB_ITERATIONS; i++) { + for (int j = 0; j < NB_THREADS; j++) { + Future> future = executorService.submit(() -> get(HEALTHCHECK_ENDPOINT, new TypeReference<>() {})); + futures.add(future); + } + for (Future> future : futures) { + List health = future.get(10, TimeUnit.SECONDS); + Assert.assertEquals(4, health.size()); + Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("karaf") && r.getStatus() == HealthCheckResponse.Status.LIVE)); + Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals(searchEngine) && r.getStatus() == HealthCheckResponse.Status.LIVE)); + Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("unomi") && r.getStatus() == HealthCheckResponse.Status.LIVE)); + Assert.assertTrue(health.stream().anyMatch(r -> r.getName().equals("cluster") && r.getStatus() == HealthCheckResponse.Status.LIVE)); + } + Thread.sleep(10); + } + executorService.shutdown(); + Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS)); + } catch (Exception e) { + LOGGER.error("Error while executing concurrent health check", e); + fail("Error while executing concurrent health check: " + e.getMessage()); + } finally { + if ( executorService != null ) { + executorService.shutdownNow(); + } + } + } + protected T get(final String url, TypeReference typeReference) { CloseableHttpResponse response = null; try { diff --git a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java index be82d3109..72576a847 100644 --- a/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/ProfileMergeIT.java @@ -20,6 +20,7 @@ import org.apache.unomi.api.actions.Action; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.rules.Rule; +import org.apache.unomi.api.services.EventService; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -27,6 +28,8 @@ import org.ops4j.pax.exam.junit.PaxExam; import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; import org.ops4j.pax.exam.spi.reactors.PerSuite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.*; @@ -36,6 +39,8 @@ @RunWith(PaxExam.class) @ExamReactorStrategy(PerSuite.class) public class ProfileMergeIT extends BaseIT { + private final static Logger LOGGER = LoggerFactory.getLogger(ProfileMergeIT.class); + public static final String PERSONALIZATION_STRATEGY_STATUS = "personalizationStrategyStatus"; public static final String PERSONALIZATION_STRATEGY_STATUS_ID = "personalizationId"; public static final String PERSONALIZATION_STRATEGY_STATUS_IN_CTRL_GROUP = "inControlGroup"; @@ -468,7 +473,11 @@ private Event sendEvent() { profile.setProperty("j:nodename", "michel"); profile.getSystemProperties().put("mergeIdentifier", "jose"); Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date()); - eventService.send(testEvent); + int result = eventService.send(testEvent); + LOGGER.info("Event processing result: {}", result); + if (result == EventService.ERROR) { + LOGGER.error("Event processing resulted in ERROR. Event details: {}", testEvent); + } return testEvent; } diff --git a/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java b/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java index 821158f19..3a2086f1b 100644 --- a/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java +++ b/itests/src/test/java/org/apache/unomi/itests/SendEventActionIT.java @@ -23,6 +23,7 @@ import org.apache.unomi.api.actions.Action; import org.apache.unomi.api.conditions.Condition; import org.apache.unomi.api.rules.Rule; +import org.apache.unomi.api.services.EventService; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -30,6 +31,8 @@ import org.ops4j.pax.exam.junit.PaxExam; import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy; import org.ops4j.pax.exam.spi.reactors.PerSuite; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.Date; @@ -38,6 +41,7 @@ @RunWith(PaxExam.class) @ExamReactorStrategy(PerSuite.class) public class SendEventActionIT extends BaseIT { + private final static Logger LOGGER = LoggerFactory.getLogger(SendEventActionIT.class); private final static String TEST_RULE_ID = "sendEventTest"; private final static String EVENT_ID = "sendEventTestId"; @@ -80,7 +84,11 @@ private Event sendEvent() { profile.setProperty("j:nodename", "michel"); Event testEvent = new Event(TEST_EVENT_TYPE, null, profile, null, null, profile, new Date()); testEvent.setItemId(EVENT_ID); - eventService.send(testEvent); + int result = eventService.send(testEvent); + LOGGER.info("Event processing result: {}", result); + if (result == EventService.ERROR) { + LOGGER.error("Event processing resulted in ERROR. Event details: {}", testEvent); + } return testEvent; } diff --git a/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java b/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java index 87a89c1b1..a5881c37e 100644 --- a/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java +++ b/tools/shell-commands/src/main/java/org/apache/unomi/shell/services/internal/UnomiManagementServiceImpl.java @@ -50,7 +50,7 @@ * to dynamically adjust its behavior based on external configurations. It leverages the {@link FeaturesService} to * manage Karaf features dynamically.

* - *

Configuration

+ *

Configuration

*

The service reads its configuration from the OSGi Configuration Admin under the PID org.apache.unomi.start. * The configuration includes:

*
    @@ -58,14 +58,14 @@ * in the format persistenceImplementation:feature1,feature2. *
* - *

Usage

+ *

Usage

*

This service can be controlled programmatically through its methods:

*
    *
  • {@link #startUnomi(String, boolean)}: Installs and starts features for the specified start features configuration.
  • *
  • {@link #stopUnomi()}: Stops and uninstalls the previously started features.
  • *
* - *

Dependencies

+ *

Dependencies

*

The following dependencies are required for this service:

*
    *
  • {@link MigrationService}: Handles migration tasks during startup.