From d7c60364e2e83fa6b9255504c515170e497eb2ef Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 6 Dec 2016 15:50:18 +0000 Subject: [PATCH 01/15] handle recursive task errors incl self-ref config address BROOKLYN-329 case (2), where a config key is defined as a function of itself detect the endless loop that results and fail with a good message also better handling in general of endless-loop task failures, including: * bail-out logic in Exceptions.collapse to prevent crazy long strings and traces * warning whenever active tasks passes N*1000 --- .../spi/dsl/methods/DslComponent.java | 19 ++++++-- .../camp/brooklyn/ConfigYamlTest.java | 47 ++++++++++++++++++- .../util/core/task/BasicExecutionManager.java | 5 +- .../util/core/task/ValueResolver.java | 4 +- .../brooklyn/util/exceptions/Exceptions.java | 36 +++++++++++++- 5 files changed, 104 insertions(+), 7 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 8c0fe3bbc2..98466be571 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -47,6 +47,7 @@ import org.apache.brooklyn.util.core.task.DeferredSupplier; import org.apache.brooklyn.util.core.task.ImmediateSupplier; import org.apache.brooklyn.util.core.task.TaskBuilder; +import org.apache.brooklyn.util.core.task.TaskTags; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.groovy.GroovyJavaMethods; @@ -502,8 +503,8 @@ public final Maybe getImmediately() { if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available"); EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get(); - ConfigKey key = (ConfigKey) targetEntity.getEntityType().getConfigKey(keyName); - Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName)); + ConfigKey key = targetEntity.getEntityType().getConfigKey(keyName); + Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName)); return Maybe.cast(result); } @@ -517,7 +518,19 @@ public Task newTask() { @Override public Object call() throws Exception { Entity targetEntity = component.get(); - ConfigKey key = (ConfigKey) targetEntity.getEntityType().getConfigKey(keyName); + + String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')"; + Task ancestor = Tasks.current(); + while (ancestor!=null) { + if (TaskTags.hasTag(ancestor, tag)) { + throw new IllegalStateException("Recursive config reference "+tag); + } + ancestor = ancestor.getSubmittedByTask(); + } + + Tasks.addTagDynamically(tag); + + ConfigKey key = targetEntity.getEntityType().getConfigKey(keyName); return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName)); }}) .build(); diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java index 07fc36a85e..013a6f72fe 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java @@ -28,8 +28,14 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -44,7 +50,6 @@ public class ConfigYamlTest extends AbstractYamlTest { - @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(ConfigYamlTest.class); private ExecutorService executor; @@ -91,6 +96,46 @@ public void testConfigInConfigBlock() throws Exception { assertNull(entity.getMyField()); // field with @SetFromFlag assertNull(entity.getMyField2()); // field with @SetFromFlag("myField2Alias"), set using alias } + + + @Test + public void testRecursiveConfigFailsGracefully() throws Exception { + String yaml = Joiner.on("\n").join( + "services:", + "- type: org.apache.brooklyn.core.test.entity.TestEntity", + " brooklyn.config:", + " infinite_loop: $brooklyn:config(\"infinite_loop\")"); + + final Entity app = createStartWaitAndLogApplication(yaml); + TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren()); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + Time.sleep(Duration.FIVE_SECONDS); + // error, loop wasn't interrupted or detected + LOG.warn("Timeout elapsed, destroying items; usage: "+ + ((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString()); + //Entities.destroy(app); + } catch (RuntimeInterruptedException e) { + // expected on normal execution + Thread.interrupted(); + } + } + }); + t.start(); + try { + String c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop")); + Asserts.shouldHaveFailedPreviously("Expected recursive error, instead got: "+c); + } catch (Exception e) { + Asserts.expectedFailureContainsIgnoreCase(e, "infinite_loop", "recursive"); + } finally { + if (!Entities.isManaged(app)) { + t.interrupt(); + } + } + } @Test public void testConfigAtTopLevel() throws Exception { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 51f1b67a6a..b6ad0415b9 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -753,7 +753,10 @@ protected void beforeStartAtomicTask(Map flags, Task task) { /** invoked in a task's thread when a task is starting to run (may be some time after submitted), * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ protected void internalBeforeStart(Map flags, Task task) { - activeTaskCount.incrementAndGet(); + int count = activeTaskCount.incrementAndGet(); + if (count % 1000==0) { + log.warn("High number of active tasks: task #"+count+" is "+task); + } //set thread _before_ start time, so we won't get a null thread when there is a start-time if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task + " running on thread " + Thread.currentThread().getName()); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index f81594eba8..8c61c452ba 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -493,7 +493,9 @@ public Object call() throws Exception { } catch (Exception e) { Exceptions.propagateIfFatal(e); - IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e); + String msg = "Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec; + String eTxt = Exceptions.collapseText(e); + IllegalArgumentException problem = eTxt.startsWith(msg) ? new IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e); if (swallowExceptions) { if (log.isDebugEnabled()) log.debug("Resolution of "+this+" failed, swallowing and returning: "+e); diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java index ee49b4abde..3b41cf6587 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Predicates.instanceOf; -import static com.google.common.base.Throwables.getCausalChain; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; @@ -55,6 +54,8 @@ public class Exceptions { private static final List> BORING_IF_NO_MESSAGE_THROWABLE_SUPERTYPES = ImmutableList.>of( PropagatedRuntimeException.class); + public static final int MAX_COLLAPSE_RECURSIVE_DEPTH = 100; + /** NB: might be useful for stack trace, e.g. {@link ExecutionException} */ private static boolean isBoringForMessage(Throwable t) { for (Class type: ALWAYS_BORING_MESSAGE_THROWABLE_SUPERTYPES) @@ -263,8 +264,41 @@ public static Throwable collapseIncludingAllCausalMessages(Throwable source) { public static Throwable collapse(Throwable source, boolean collapseCausalChain) { return collapse(source, collapseCausalChain, false, ImmutableSet.of(), new Object[0]); } + + /** As {@link Throwables#getCausalChain(Throwable)} but safe in the face of perverse classes which return themselves as their cause or otherwise have a recursive causal chain. */ + public static List getCausalChain(Throwable t) { + Set result = MutableSet.of(); + while (t!=null) { + if (!result.add(t)) break; + t = t.getCause(); + } + return ImmutableList.copyOf(result); + } + + private static boolean isCausalChainDepthExceeding(Throwable t, int size) { + if (size<0) return true; + if (t==null) return false; + return isCausalChainDepthExceeding(t.getCause(), size-1); + } private static Throwable collapse(Throwable source, boolean collapseCausalChain, boolean includeAllCausalMessages, Set visited, Object contexts[]) { + if (visited.isEmpty()) { + if (isCausalChainDepthExceeding(source, MAX_COLLAPSE_RECURSIVE_DEPTH)) { + // do fast check above, then do deeper check which survives recursive causes + List chain = getCausalChain(source); + if (chain.size() > MAX_COLLAPSE_RECURSIVE_DEPTH) { + // if it's an OOME or other huge stack, shrink it so we don't spin huge cycles processing the trace and printing it + // (sometimes generating subsequent OOME's in logback that mask the first!) + // coarse heuristic for how to reduce it, but that's better than killing cpu, causing further errors, and suppressing the root cause altogether! + String msg = chain.get(0).getMessage(); + if (msg.length() > 512) msg = msg.substring(0, 500)+"..."; + return new PropagatedRuntimeException("Huge stack trace (size "+chain.size()+", removing all but last few), " + + "starting: "+chain.get(0).getClass().getName()+": "+msg+"; ultimately caused by: ", + chain.get(chain.size() - 10)); + } + } + } + visited = MutableSet.copyOf(visited); String message = ""; Throwable collapsed = source; From 4121554d8664ee11ed989428e0e84992267da634 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 6 Dec 2016 16:09:20 +0000 Subject: [PATCH 02/15] add (failing) test re config loop and immediate evaluation --- .../brooklyn/camp/brooklyn/ConfigYamlTest.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java index 013a6f72fe..64700defb3 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java @@ -100,6 +100,16 @@ public void testConfigInConfigBlock() throws Exception { @Test public void testRecursiveConfigFailsGracefully() throws Exception { + doTestRecursiveConfigFailsGracefully(false); + } + + // TODO this test fails because entities aren't available when evaluating immediately + @Test + public void testRecursiveConfigImmediateFailsGracefully() throws Exception { + doTestRecursiveConfigFailsGracefully(true); + } + + protected void doTestRecursiveConfigFailsGracefully(boolean immediate) throws Exception { String yaml = Joiner.on("\n").join( "services:", "- type: org.apache.brooklyn.core.test.entity.TestEntity", @@ -126,7 +136,13 @@ public void run() { }); t.start(); try { - String c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop")); + String c; + if (immediate) { + // this should throw rather than return "absent", because the error is definitive (absent means couldn't resolve in time) + c = entity.config().getNonBlocking(ConfigKeys.newStringConfigKey("infinite_loop")).or("FAILED"); + } else { + c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop")); + } Asserts.shouldHaveFailedPreviously("Expected recursive error, instead got: "+c); } catch (Exception e) { Asserts.expectedFailureContainsIgnoreCase(e, "infinite_loop", "recursive"); From 5324f8212aac0aee1deac9b47155e37b1120c78a Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 6 Dec 2016 16:26:41 +0000 Subject: [PATCH 03/15] better logging and reporting if no entity available for immediate eval --- .../brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 98466be571..1f704b39d0 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -216,7 +216,7 @@ protected Maybe getEntity(boolean immediate) { return Maybe.of(scopeComponent.get()); } } else { - return Maybe.of(entity()); + return Maybe.ofDisallowingNull(entity()).or(Maybe.absent("Context entity not available when trying to evaluate Brooklyn DSL")); } } @@ -500,9 +500,8 @@ public DslConfigSupplier(DslComponent component, String keyName) { @Override public final Maybe getImmediately() { Maybe targetEntityMaybe = component.getImmediately(); - if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available"); + if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe); EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get(); - ConfigKey key = targetEntity.getEntityType().getConfigKey(keyName); Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName)); return Maybe.cast(result); From a6796829e89f4239a5480622abd1d166485c766c Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Thu, 17 Nov 2016 15:11:14 +0000 Subject: [PATCH 04/15] flesh out test cases around non-blocking evaluation --- .../util/core/task/ImmediateSupplier.java | 5 +- .../task/InterruptingImmediateSupplier.java | 73 ++++++++ .../core/entity/EntityConfigTest.java | 174 ++++++++++++++---- 3 files changed, 210 insertions(+), 42 deletions(-) create mode 100644 core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java index ac0aae4e4e..ef9d648dce 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java @@ -27,7 +27,8 @@ public interface ImmediateSupplier { /** - * Indicates that we are unable to get the value immediately, because that is not supported + * Indicates that a supplier does not support immediate evaluation, + * i.e. it may need to block to evaluate even if there is a value available * (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}. */ public static class ImmediateUnsupportedException extends UnsupportedOperationException { @@ -44,7 +45,7 @@ public ImmediateUnsupportedException(String message, Throwable cause) { /** * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available. * - * @throws ImmediateUnsupportedException if cannot determinte the value immediately + * @throws ImmediateUnsupportedException if cannot determine whether a value is immediately available */ Maybe getImmediately(); } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java new file mode 100644 index 0000000000..c29b458f5d --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.core.task; + +import java.util.concurrent.Semaphore; + +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.guava.Maybe; + +import com.google.common.base.Supplier; + +/** + * Wraps a {@link Supplier} as an {@link ImmediateSupplier} by interrupting the thread before calling {@link Supplier#get()}. + * If the call succeeds, the result is returned. + * If the call throws any trace including an {@link InterruptedException} or {@link RuntimeInterruptedException} + * (ie the call failed due to the interruption, typically because it tried to wait) + * then this class concludes that there is no value available immediately and returns {@link Maybe#absent()}. + * If the call throws any other error, that is returned. + * The interruption is cleared afterwards (unless the thread was interrupted when the method was entered). + *

+ * Note that some "immediate" methods, such as {@link Semaphore#acquire()} when a semaphore is available, + * will throw if the thread is interrupted. Typically there are workarounds, for instance: + * if (semaphore.tryAcquire()) semaphore.acquire();. + */ +public class InterruptingImmediateSupplier implements ImmediateSupplier, DeferredSupplier { + + final Supplier nestedSupplier; + + public InterruptingImmediateSupplier(Supplier nestedSupplier) { + this.nestedSupplier = nestedSupplier; + } + + @Override + public Maybe getImmediately() { + boolean interrupted = Thread.currentThread().isInterrupted(); + try { + if (!interrupted) Thread.currentThread().interrupt(); + return Maybe.ofAllowingNull(get()); + } catch (Throwable t) { + if (Exceptions.getFirstThrowableOfType(t, InterruptedException.class)!=null || + Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null) { + return Maybe.absent("Immediate value not available"); + } + throw Exceptions.propagate(t); + } finally { + if (!interrupted) Thread.interrupted(); + } + } + + @Override + public T get() { + return nestedSupplier.get(); + } + + +} diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java index 38f5f90301..636707136d 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +37,7 @@ import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.mgmt.ExecutionManager; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.config.ConfigPredicates; @@ -47,13 +49,17 @@ import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.DeferredSupplier; +import org.apache.brooklyn.util.core.task.InterruptingImmediateSupplier; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -63,6 +69,8 @@ public class EntityConfigTest extends BrooklynAppUnitTestSupport { + private static final Logger log = LoggerFactory.getLogger(EntityConfigTest.class); + private static final int TIMEOUT_MS = 10*1000; private ExecutorService executor; @@ -244,59 +252,145 @@ public void testGetConfigMapWithSubValueAsStringNotCoerced() throws Exception { // of the previous "test.confMapThing.obj". // // Presumably an earlier call to task.get() timed out, causing it to cancel the task? + // Alex: yes, a task.cancel is performed for maps in + // AbstractEntity$BasicConfigurationSupport(AbstractConfigurationSupportInternal).getNonBlockingResolvingStructuredKey(ConfigKey) + + // // I (Aled) question whether we want to support passing a task (rather than a // DeferredSupplier or TaskFactory, for example). Our EntitySpec.configure is overloaded // to take a Task, but that feels wrong!? - @Test(groups="Broken") - public void testGetTaskNonBlocking() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - Task task = Tasks.builder().body( + // + // If starting clean I (Alex) would agree, we should use TaskFactory. However the + // DependentConfiguration methods -- including the ubiquitous AttributeWhenReady -- + // return Task instances so they should survive a getNonBlocking or get with a short timeout + // access, and if a value is subsequently available it should be returned + // (which this test asserts, but is currently failing). If TaskFactory is used the + // intended semantics are clear -- you create a new task on each access, and can interrupt it + // and discard it if needed. For a Task it's less clear: probably the semantics are that the + // first returned value is what the value is forevermore. Probably it should not be interrupted + // on a non-blocking / short-wait access, or possibly it should simply be re-run if a previous + // execution was interrupted (but take care if we have a simultaneous non-blocking and blocking + // access, if the first one interrupts the second one should still get a value). + // I tend to think ideally we should switch to using TaskFactory in DependentConfiguration. + class ConfigNonBlockingFixture { + final Semaphore latch = new Semaphore(0); + final String expectedVal = "myval"; + Object blockingVal; + + protected ConfigNonBlockingFixture usingTask() { + blockingVal = taskFactory().newTask(); + return this; + } + + protected ConfigNonBlockingFixture usingTaskFactory() { + blockingVal = taskFactory(); + return this; + } + + protected ConfigNonBlockingFixture usingDeferredSupplier() { + blockingVal = deferredSupplier(); + return this; + } + + protected ConfigNonBlockingFixture usingImmediateSupplier() { + blockingVal = new InterruptingImmediateSupplier(deferredSupplier()); + return this; + } + + private TaskFactory> taskFactory() { + return Tasks.builder().body( new Callable() { @Override public String call() throws Exception { - latch.await(); + if (!latch.tryAcquire()) latch.acquire(); + latch.release(); return "myval"; }}) - .build(); - runGetConfigNonBlocking(latch, task, "myval"); - } - - @Test - public void testGetDeferredSupplierNonBlocking() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - DeferredSupplier task = new DeferredSupplier() { - @Override public String get() { - try { - latch.await(); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - return "myval"; - } - }; - runGetConfigNonBlocking(latch, task, "myval"); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - protected void runGetConfigNonBlocking(CountDownLatch latch, Object blockingVal, String expectedVal) throws Exception { - TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) - .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal)) - .configure((ConfigKey)TestEntity.CONF_NAME, blockingVal)); - - // Will initially return absent, because task is not done - assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent()); - assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent()); + .buildFactory(); + } - latch.countDown(); + private DeferredSupplier deferredSupplier() { + return new DeferredSupplier() { + @Override public String get() { + try { + log.info("acquiring"); + if (!latch.tryAcquire()) latch.acquire(); + latch.release(); + log.info("acquired and released"); + } catch (InterruptedException e) { + log.info("interrupted"); + throw Exceptions.propagate(e); + } + return "myval"; + } + }; + } - // Can now finish task, so will return expectedVal - assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mysub", expectedVal)); - assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")), expectedVal); + protected void runGetConfigNonBlockingInKey() throws Exception { + Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this"); + + @SuppressWarnings("unchecked") + TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) + .configure((ConfigKey)(ConfigKey)TestEntity.CONF_NAME, blockingVal)); + + log.info("get non-blocking"); + // Will initially return absent, because task is not done + assertTrue(entity.config().getNonBlocking(TestEntity.CONF_NAME).isAbsent()); + log.info("got absent"); + + latch.release(); + + // Can now finish task, so will return expectedVal + log.info("get blocking"); + assertEquals(entity.config().get(TestEntity.CONF_NAME), expectedVal); + log.info("got blocking"); + assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(), expectedVal); + + latch.acquire(); + log.info("finished"); + } - assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal)); - assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal); + protected void runGetConfigNonBlockingInMap() throws Exception { + Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this"); + + TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) + .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal))); + + // Will initially return absent, because task is not done + assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent()); + assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent()); + + latch.release(); + + // Can now finish task, so will return expectedVal + assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mysub", expectedVal)); + assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")), expectedVal); + + assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal)); + assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal); + } } + @Test public void testGetTaskNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInKey(); } + @Test public void testGetTaskNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInMap(); } + + @Test public void testGetTaskFactoryNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); } + @Test public void testGetTaskFactoryNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInMap(); } + + @Test public void testGetSupplierNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInKey(); } + @Test public void testGetSuppierNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInMap(); } + + @Test public void testGetImmediateSupplierNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInKey(); } + @Test public void testGetImmediateSupplierNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInMap(); } + @Test public void testGetConfigKeysReturnsFromSuperAndInterfacesAndSubClass() throws Exception { MySubEntity entity = app.addChild(EntitySpec.create(MySubEntity.class)); From faeeb1bdfc065b7e44554f45cca6a3f87074efb1 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 6 Dec 2016 22:15:10 +0000 Subject: [PATCH 05/15] immediate execution runs in a fake tag allowing context to be evaluated most new immediate tests now passing, including a new test which detects recursive config values for immediate; except we still have: * cancellations of immediate execution goes too far, and cancels tasks which are set as values * task factories still not supported for evaluation --- .../brooklyn/api/mgmt/ExecutionContext.java | 3 ++ .../spi/dsl/methods/DslComponent.java | 31 ++++++++++--------- .../brooklyn/core/mgmt/BrooklynTaskTags.java | 2 ++ .../AbstractConfigurationSupportInternal.java | 1 - .../util/core/task/BasicExecutionContext.java | 31 ++++++++++++++++++- .../task/InterruptingImmediateSupplier.java | 3 +- .../util/core/task/ValueResolver.java | 4 +-- 7 files changed, 56 insertions(+), 19 deletions(-) diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java index 4540240ec3..d8e538c774 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java @@ -24,6 +24,7 @@ import java.util.concurrent.Executor; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.util.guava.Maybe; /** * This is a Brooklyn extension to the Java {@link Executor}. @@ -64,4 +65,6 @@ public interface ExecutionContext extends Executor { boolean isShutdown(); + Maybe getImmediately(Object callableOrSupplier); + } \ No newline at end of file diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 1f704b39d0..80e202ddf5 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -503,6 +503,7 @@ public final Maybe getImmediately() { if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe); EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get(); ConfigKey key = targetEntity.getEntityType().getConfigKey(keyName); + checkAndTagForRecursiveReference(targetEntity); Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName)); return Maybe.cast(result); } @@ -517,22 +518,24 @@ public Task newTask() { @Override public Object call() throws Exception { Entity targetEntity = component.get(); - - String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')"; - Task ancestor = Tasks.current(); - while (ancestor!=null) { - if (TaskTags.hasTag(ancestor, tag)) { - throw new IllegalStateException("Recursive config reference "+tag); - } - ancestor = ancestor.getSubmittedByTask(); - } - - Tasks.addTagDynamically(tag); - + checkAndTagForRecursiveReference(targetEntity); ConfigKey key = targetEntity.getEntityType().getConfigKey(keyName); return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyName)); - }}) - .build(); + } + }).build(); + } + + private void checkAndTagForRecursiveReference(Entity targetEntity) { + String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')"; + Task ancestor = Tasks.current(); + while (ancestor!=null) { + if (TaskTags.hasTag(ancestor, tag)) { + throw new IllegalStateException("Recursive config reference "+tag); + } + ancestor = ancestor.getSubmittedByTask(); + } + + Tasks.addTagDynamically(tag); } @Override diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java index 39b2f70d92..8f4fa144d2 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java @@ -78,6 +78,8 @@ public class BrooklynTaskTags extends TaskTags { * and that it need not appear in some task lists; * often used for framework lifecycle events and sensor polling */ public static final String TRANSIENT_TASK_TAG = "TRANSIENT"; + /** marks that a task is meant to return immediately, without blocking (or if absolutely necessary blocking for a short while) */ + public static final String IMMEDIATE_TASK_TAG = "IMMEDIATE"; // ------------- entity tags ------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java index ce10c86957..796ab137da 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java @@ -146,7 +146,6 @@ protected Maybe getNonBlockingResolvingSimple(ConfigKey key) { .immediately(true) .deep(true) .context(getContext()) - .swallowExceptions() .get(); return (resolved != marker) ? TypeCoercions.tryCoerce(resolved, key.getTypeToken()) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 38f1b5a4ce..f23053b6ba 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -41,10 +41,12 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; /** @@ -96,7 +98,34 @@ public ExecutionManager getExecutionManager() { /** returns tasks started by this context (or tasks which have all the tags on this object) */ @Override public Set> getTasks() { return executionManager.getTasksWithAllTags(tags); } - + + /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context */ + @SuppressWarnings("unchecked") + @Override + public Maybe getImmediately(Object callableOrSupplier) { + BasicTask fakeTaskForContext = new BasicTask(MutableMap.of("displayName", "immediate evaluation")); + fakeTaskForContext.tags.addAll(tags); + fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG); + fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); + + Task previousTask = BasicExecutionManager.getPerThreadCurrentTask().get(); + if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask); + try { + BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext); + + if ((callableOrSupplier instanceof Supplier) && !(callableOrSupplier instanceof ImmediateSupplier)) { + callableOrSupplier = new InterruptingImmediateSupplier<>((Supplier)callableOrSupplier); + } + if (callableOrSupplier instanceof ImmediateSupplier) { + return ((ImmediateSupplier)callableOrSupplier).getImmediately(); + } + // TODO could add more types here + throw new IllegalArgumentException("Type "+callableOrSupplier.getClass()+" not supported for getImmediately (instance "+callableOrSupplier+")"); + } finally { + BasicExecutionManager.getPerThreadCurrentTask().set(previousTask); + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override protected Task submitInternal(Map propertiesQ, final Object task) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java index c29b458f5d..c478f5e15f 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -18,6 +18,7 @@ */ package org.apache.brooklyn.util.core.task; +import java.util.NoSuchElementException; import java.util.concurrent.Semaphore; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -56,7 +57,7 @@ public Maybe getImmediately() { } catch (Throwable t) { if (Exceptions.getFirstThrowableOfType(t, InterruptedException.class)!=null || Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null) { - return Maybe.absent("Immediate value not available"); + return Maybe.absent(new UnsupportedOperationException("Immediate value not available", t)); } throw Exceptions.propagate(t); } finally { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index 8c61c452ba..ec7eb010b7 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -357,9 +357,9 @@ protected Maybe getMaybeInternal() { try { if (immediately && v instanceof ImmediateSupplier) { - final ImmediateSupplier supplier = (ImmediateSupplier) v; + final ImmediateSupplier supplier = (ImmediateSupplier) v; try { - Maybe result = supplier.getImmediately(); + Maybe result = exec.getImmediately(supplier); // Recurse: need to ensure returned value is cast, etc return (result.isPresent()) From f84d886337de242450097fec06ec62ccaf1fe807 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 6 Dec 2016 22:34:46 +0000 Subject: [PATCH 06/15] solve problem with map eval where tasks are cancelled permanently sets non-transient if a task is requested for a value-resolver we might want to deprecate that altogether, instead use TaskFactory so we can cancel things don't think there will be much leaking because the ValueResolver isn't used for new tasks, just for tasks which are set as values -- but we need to keep an eye on that. such tasks should be cancelled when the entities are cleaned up. --- .../util/core/task/BasicExecutionManager.java | 2 +- .../util/core/task/DynamicSequentialTask.java | 6 +++-- .../brooklyn/util/core/task/TaskInternal.java | 3 +++ .../util/core/task/ValueResolver.java | 13 +++++++--- .../core/entity/EntityConfigTest.java | 25 ++++++++++++------- 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index b6ad0415b9..8b59498907 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -591,7 +591,7 @@ public boolean cancel(TaskCancellationMode mode) { if (!task.isCancelled()) result |= ((TaskInternal)task).cancel(mode); result |= delegate().cancel(mode.isAllowedToInterruptTask()); - if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) { + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { int subtasksFound=0; int subtasksReallyCancelled=0; diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java index 1b421b0b25..2869ff98de 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java @@ -160,9 +160,11 @@ public void queue(Task t) { @Override protected boolean doCancel(TaskCancellationMode mode) { boolean result = false; - if (mode.isAllowedToInterruptDependentSubmittedTasks() || mode.isAllowedToInterruptAllSubmittedTasks()) { - for (Task t: secondaryJobsAll) + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { + for (Task t: secondaryJobsAll) { + // secondary jobs are dependent result = ((TaskInternal)t).cancel(mode) || result; + } } return super.doCancel(mode) || result; // returns true if anything is successfully cancelled diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java index 99c2773d64..f565aa06b3 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java @@ -143,6 +143,9 @@ private TaskCancellationMode(boolean mayInterruptIfRunning, boolean interruptSub this.allowedToInterruptTask = mayInterruptIfRunning; this.allowedToInterruptDependentSubmittedTasks = interruptSubmittedTransients; this.allowedToInterruptAllSubmittedTasks = interruptAllSubmitted; + + // if dependent isn't set, then all shouldn't be set + assert !(this.allowedToInterruptAllSubmittedTasks && !this.allowedToInterruptDependentSubmittedTasks); } public boolean isAllowedToInterruptTask() { return allowedToInterruptTask; } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index ec7eb010b7..672fef4396 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -375,10 +375,17 @@ protected Maybe getMaybeInternal() { //if it's a task or a future, we wait for the task to complete if (v instanceof TaskAdaptable) { //if it's a task, we make sure it is submitted - if (!((TaskAdaptable) v).asTask().isSubmitted() ) { - if (exec==null) + Task task = ((TaskAdaptable) v).asTask(); + if (!task.isSubmitted()) { + if (exec==null) { return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available"); - exec.submit(((TaskAdaptable) v).asTask()); + } + if (!task.getTags().contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { + // mark this non-transient, because this value is usually something set e.g. in config + // (ideally we'd discourage this in favour of task factories which can be transiently interrupted) + BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); + } + exec.submit(task); } } diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java index 636707136d..820fc14e2c 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java @@ -352,7 +352,6 @@ protected void runGetConfigNonBlockingInKey() throws Exception { protected void runGetConfigNonBlockingInMap() throws Exception { Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this"); - TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal))); @@ -371,24 +370,32 @@ protected void runGetConfigNonBlockingInMap() throws Exception { } } - @Test public void testGetTaskNonBlockingKey() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskNonBlockingKey() throws Exception { new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInKey(); } - @Test public void testGetTaskNonBlockingMap() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskNonBlockingMap() throws Exception { new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInMap(); } - @Test public void testGetTaskFactoryNonBlockingKey() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskFactoryNonBlockingKey() throws Exception { new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); } - @Test public void testGetTaskFactoryNonBlockingMap() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskFactoryNonBlockingMap() throws Exception { new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInMap(); } - @Test public void testGetSupplierNonBlockingKey() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetSupplierNonBlockingKey() throws Exception { new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInKey(); } - @Test public void testGetSuppierNonBlockingMap() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetSuppierNonBlockingMap() throws Exception { new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInMap(); } - @Test public void testGetImmediateSupplierNonBlockingKey() throws Exception { + @Test // fast + public void testGetImmediateSupplierNonBlockingKey() throws Exception { new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInKey(); } - @Test public void testGetImmediateSupplierNonBlockingMap() throws Exception { + @Test(groups="Integration") // because takes 1s+ + public void testGetImmediateSupplierNonBlockingMap() throws Exception { new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInMap(); } @Test From 49f0e225f8196c9d2314afe52cffbc1839cdfcf6 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 6 Dec 2016 22:45:14 +0000 Subject: [PATCH 07/15] cleanup, and allow TaskFactory to be supplied as a config and other ValueResolver input the TF will create a task which will then be used for evaluation. much cleaner semantics than setting tasks as values: tasks evaluate once and remember their result, whereas task factory spawns a new task each time. furthermore, the former cannot be interrupted without making the value _never_ resolvable (which was the case prior to the previous commit) so it is left running if immediate eval is done, whereas the latter can be safely cancelled. --- .../spi/dsl/methods/DslComponent.java | 16 +++++++-- .../camp/brooklyn/spi/dsl/DslTest.java | 11 ++++--- .../internal/AbstractConfigMapImpl.java | 3 +- .../util/core/task/BasicExecutionContext.java | 16 ++++----- .../util/core/task/ImmediateSupplier.java | 8 +++-- .../task/InterruptingImmediateSupplier.java | 33 +++++++++++++++++-- .../brooklyn/util/core/task/TaskTags.java | 1 + .../util/core/task/ValueResolver.java | 24 +++++++++++--- .../core/entity/EntityConfigTest.java | 20 +++++------ .../util/core/task/ValueResolverTest.java | 29 ++++++++++------ 10 files changed, 114 insertions(+), 47 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 80e202ddf5..e7457945f7 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -203,6 +203,15 @@ public Maybe getImmediately() { } } + @Override + public Entity get() { + try { + return call(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + @Override public Entity call() throws Exception { return callImpl(false).get(); @@ -304,10 +313,11 @@ protected Maybe callImpl(boolean immediate) throws Exception { return Maybe.of(result.get()); } - // TODO may want to block and repeat on new entities joining? - throw new NoSuchElementException("No entity matching id " + desiredComponentId+ + // could be nice if DSL has an extra .block() method to allow it to wait for a matching entity. + // previously we threw if nothing existed; now we return an absent with a detailed error + return Maybe.absent(new NoSuchElementException("No entity matching id " + desiredComponentId+ (scope==Scope.GLOBAL ? "" : ", in scope "+scope+" wrt "+entity+ - (scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : ""))); + (scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : "")))); } private ExecutionContext getExecutionContext() { diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java index 514a7887d6..170b799c4e 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java @@ -48,6 +48,7 @@ import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.text.Identifiers; import org.apache.brooklyn.util.time.Duration; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -296,14 +297,13 @@ public void testUrlEncode() throws Exception { @Test public void testEntityNotFound() throws Exception { BrooklynDslDeferredSupplier dsl = BrooklynDslCommon.entity("myIdDoesNotExist"); + Maybe actualValue = execDslImmediately(dsl, Entity.class, app, true); + Assert.assertTrue(actualValue.isAbsent()); try { - Maybe actualValue = execDslImmediately(dsl, Entity.class, app, true); + actualValue.get(); Asserts.shouldHaveFailedPreviously("actual="+actualValue); } catch (Exception e) { - NoSuchElementException nsee = Exceptions.getFirstThrowableOfType(e, NoSuchElementException.class); - if (nsee == null) { - throw e; - } + Asserts.expectedFailureOfType(e, NoSuchElementException.class); } } @@ -365,6 +365,7 @@ public DslTestWorker satisfiedAsynchronously(boolean val) { return this; } + @SuppressWarnings("unused") // kept in case useful for additional tests public DslTestWorker wrapInTaskForImmediately(boolean val) { wrapInTaskForImmediately = val; return this; diff --git a/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java b/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java index 2d92617516..b736beb27e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.config.ConfigInheritance; import org.apache.brooklyn.config.ConfigInheritances; @@ -231,7 +232,7 @@ public Maybe getConfigRaw(ConfigKey key, boolean includeInherited) { } protected Object coerceConfigVal(ConfigKey key, Object v) { - if ((v instanceof Future) || (v instanceof DeferredSupplier)) { + if ((v instanceof Future) || (v instanceof DeferredSupplier) || (v instanceof TaskFactory)) { // no coercion for these (coerce on exit) return v; } else if (key instanceof StructuredConfigKey) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index f23053b6ba..6c69509eda 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; -import com.google.common.base.Supplier; import com.google.common.collect.Iterables; /** @@ -99,7 +98,8 @@ public ExecutionManager getExecutionManager() { @Override public Set> getTasks() { return executionManager.getTasksWithAllTags(tags); } - /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context */ + /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context; + * currently supports suppliers or callables */ @SuppressWarnings("unchecked") @Override public Maybe getImmediately(Object callableOrSupplier) { @@ -110,17 +110,15 @@ public Maybe getImmediately(Object callableOrSupplier) { Task previousTask = BasicExecutionManager.getPerThreadCurrentTask().get(); if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask); + fakeTaskForContext.cancel(); try { BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext); - if ((callableOrSupplier instanceof Supplier) && !(callableOrSupplier instanceof ImmediateSupplier)) { - callableOrSupplier = new InterruptingImmediateSupplier<>((Supplier)callableOrSupplier); + if (!(callableOrSupplier instanceof ImmediateSupplier)) { + callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier); } - if (callableOrSupplier instanceof ImmediateSupplier) { - return ((ImmediateSupplier)callableOrSupplier).getImmediately(); - } - // TODO could add more types here - throw new IllegalArgumentException("Type "+callableOrSupplier.getClass()+" not supported for getImmediately (instance "+callableOrSupplier+")"); + return ((ImmediateSupplier)callableOrSupplier).getImmediately(); + } finally { BasicExecutionManager.getPerThreadCurrentTask().set(previousTask); } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java index ef9d648dce..5ec8d68d1b 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java @@ -20,11 +20,13 @@ import org.apache.brooklyn.util.guava.Maybe; +import com.google.common.base.Supplier; + /** - * A class that supplies objects of a single type, without blocking for any significant length - * of time. + * A {@link Supplier} that has an extra method capable of supplying a value immediately or an absent if definitely not available, + * or throwing an {@link ImmediateUnsupportedException} if it cannot determine whether a value is immediately available. */ -public interface ImmediateSupplier { +public interface ImmediateSupplier extends Supplier { /** * Indicates that a supplier does not support immediate evaluation, diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java index c478f5e15f..ced001e17c 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -18,13 +18,14 @@ */ package org.apache.brooklyn.util.core.task; -import java.util.NoSuchElementException; +import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.guava.Maybe; +import com.google.common.annotations.Beta; import com.google.common.base.Supplier; /** @@ -40,6 +41,7 @@ * will throw if the thread is interrupted. Typically there are workarounds, for instance: * if (semaphore.tryAcquire()) semaphore.acquire();. */ +@Beta public class InterruptingImmediateSupplier implements ImmediateSupplier, DeferredSupplier { final Supplier nestedSupplier; @@ -69,6 +71,33 @@ public Maybe getImmediately() { public T get() { return nestedSupplier.get(); } - + + @SuppressWarnings("unchecked") + public static InterruptingImmediateSupplier of(final Object o) { + if (o instanceof Supplier) { + return new InterruptingImmediateSupplier((Supplier)o); + } else if (o instanceof Callable) { + return new InterruptingImmediateSupplier(new Supplier() { + @Override + public T get() { + try { + return ((Callable)o).call(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + }); + } else if (o instanceof Runnable) { + return new InterruptingImmediateSupplier(new Supplier() { + @Override + public T get() { + ((Runnable)o).run(); + return null; + } + }); + } else { + throw new UnsupportedOperationException("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")"); + } + } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java index 6b64a6b0f6..4319796810 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java @@ -62,6 +62,7 @@ public static boolean isInessential(Task task) { } public static boolean hasTag(Task task, Object tag) { + if (task==null) return false; return task.getTags().contains(tag); } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index 672fef4396..6644a9a16b 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -29,6 +29,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.core.flags.TypeCoercions; @@ -322,6 +323,10 @@ public Maybe getMaybe() { return result; } + protected boolean isEvaluatingImmediately() { + return immediately || BrooklynTaskTags.hasTag(Tasks.current(), BrooklynTaskTags.IMMEDIATE_TASK_TAG); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) protected Maybe getMaybeInternal() { if (started.getAndSet(true)) @@ -352,11 +357,11 @@ protected Maybe getMaybeInternal() { //if the expected type is a closure or map and that's what we have, we're done (or if it's null); //but not allowed to return a future or DeferredSupplier as the resolved value - if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v))) + if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v) && !TaskFactory.class.isInstance(v))) return Maybe.of((T) v); try { - if (immediately && v instanceof ImmediateSupplier) { + if (isEvaluatingImmediately() && v instanceof ImmediateSupplier) { final ImmediateSupplier supplier = (ImmediateSupplier) v; try { Maybe result = exec.getImmediately(supplier); @@ -366,12 +371,23 @@ protected Maybe getMaybeInternal() { ? recursive ? new ValueResolver(result.get(), type, this).getMaybe() : result - : Maybe.absent(); + : result; } catch (ImmediateSupplier.ImmediateUnsupportedException e) { log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e); } } + // TODO if evaluating immediately should use a new ExecutionContext.submitImmediate(...) + // and sets a timeout but which wraps a task but does not spawn a new thread + + if ((v instanceof TaskFactory) && !(v instanceof DeferredSupplier)) { + v = ((TaskFactory)v).newTask(); + BrooklynTaskTags.setTransient(((TaskAdaptable)v).asTask()); + if (isEvaluatingImmediately()) { + BrooklynTaskTags.addTagDynamically( ((TaskAdaptable)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG ); + } + } + //if it's a task or a future, we wait for the task to complete if (v instanceof TaskAdaptable) { //if it's a task, we make sure it is submitted @@ -382,7 +398,7 @@ protected Maybe getMaybeInternal() { } if (!task.getTags().contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { // mark this non-transient, because this value is usually something set e.g. in config - // (ideally we'd discourage this in favour of task factories which can be transiently interrupted) + // (should discourage this in favour of task factories which can be transiently interrupted?) BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); } exec.submit(task); diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java index 820fc14e2c..672924f4c1 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java @@ -313,12 +313,12 @@ private DeferredSupplier deferredSupplier() { return new DeferredSupplier() { @Override public String get() { try { - log.info("acquiring"); + log.trace("acquiring"); if (!latch.tryAcquire()) latch.acquire(); latch.release(); - log.info("acquired and released"); + log.trace("acquired and released"); } catch (InterruptedException e) { - log.info("interrupted"); + log.trace("interrupted"); throw Exceptions.propagate(e); } return "myval"; @@ -333,21 +333,21 @@ protected void runGetConfigNonBlockingInKey() throws Exception { TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) .configure((ConfigKey)(ConfigKey)TestEntity.CONF_NAME, blockingVal)); - log.info("get non-blocking"); + log.trace("get non-blocking"); // Will initially return absent, because task is not done assertTrue(entity.config().getNonBlocking(TestEntity.CONF_NAME).isAbsent()); - log.info("got absent"); + log.trace("got absent"); latch.release(); // Can now finish task, so will return expectedVal - log.info("get blocking"); + log.trace("get blocking"); assertEquals(entity.config().get(TestEntity.CONF_NAME), expectedVal); - log.info("got blocking"); + log.trace("got blocking"); assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(), expectedVal); latch.acquire(); - log.info("finished"); + log.trace("finished"); } protected void runGetConfigNonBlockingInMap() throws Exception { @@ -526,7 +526,7 @@ public String call() { assertEquals(getConfigFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), "abc"); } - @Test + @Test(groups="Integration") // takes 0.5s public void testGetConfigWithExecutedTaskWaitsForResult() throws Exception { LatchingCallable work = new LatchingCallable("abc"); Task task = executionManager.submit(work); @@ -548,7 +548,7 @@ public String call() { assertEquals(work.callCount.get(), 1); } - @Test + @Test(groups="Integration") // takes 0.5s public void testGetConfigWithUnexecutedTaskIsExecutedAndWaitsForResult() throws Exception { LatchingCallable work = new LatchingCallable("abc"); Task task = new BasicTask(work); diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java index e47e4c9bbc..358f39d17c 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java @@ -20,7 +20,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; import static org.testng.Assert.fail; import java.util.Arrays; @@ -137,19 +136,17 @@ public void testDefaultBeforeDelayAndError() { assertMaybeIsAbsent(result); Assert.assertEquals(result.get(), "foo"); } - + public void testGetImmediately() { MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(); CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get(); - assertNull(callInfo.task); - assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately"); + assertImmediateFakeTaskFromMethod(callInfo, "testGetImmediately"); } public void testImmediateSupplierWithTimeoutUsesBlocking() { MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(); CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get(); - assertNotNull(callInfo.task); - assertNotContainsCallingMethod(callInfo.stackTrace, "testImmediateSupplierWithTimeoutUsesBlocking"); + assertRealTaskNotFromMethod(callInfo, "testImmediateSupplierWithTimeoutUsesBlocking"); } public void testGetImmediatelyInTask() throws Exception { @@ -164,16 +161,14 @@ private CallInfo myUniquelyNamedMethod() { } }); CallInfo callInfo = task.get(); - assertEquals(callInfo.task, task); - assertContainsCallingMethod(callInfo.stackTrace, "myUniquelyNamedMethod"); + assertImmediateFakeTaskFromMethod(callInfo, "myUniquelyNamedMethod"); } public void testGetImmediatelyFallsBackToDeferredCallInTask() throws Exception { final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(true); CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get(); - assertNotNull(callInfo.task); + assertRealTaskNotFromMethod(callInfo, "testGetImmediatelyFallsBackToDeferredCallInTask"); assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app); - assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediatelyFallsBackToDeferredCallInTask"); } public void testNonRecursiveBlockingFailsOnNonObjectType() throws Exception { @@ -359,4 +354,18 @@ private void assertNotContainsCallingMethod(StackTraceElement[] stackTrace, Stri } } } + + private void assertImmediateFakeTaskFromMethod(CallInfo callInfo, String method) { + // previously task was null, but now there is a "fake task" + assertNotNull(callInfo.task); + Assert.assertFalse(callInfo.task.isSubmitted()); + assertContainsCallingMethod(callInfo.stackTrace, method); + } + + private void assertRealTaskNotFromMethod(CallInfo callInfo, String method) { + assertNotNull(callInfo.task); + Assert.assertTrue(callInfo.task.isSubmitted()); + assertNotContainsCallingMethod(callInfo.stackTrace, method); + } + } From 7476d3b5f830d80fdd872ccfd70c8f0f7bf98015 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Sat, 18 Feb 2017 01:30:55 +0000 Subject: [PATCH 08/15] add many of the code review comments --- .../brooklyn/api/mgmt/ExecutionContext.java | 14 ++ .../camp/brooklyn/ConfigYamlTest.java | 5 +- .../task/InterruptingImmediateSupplier.java | 2 +- .../core/entity/EntityConfigTest.java | 24 ++-- .../InterruptingImmediateSupplierTest.java | 133 ++++++++++++++++++ .../util/exceptions/ExceptionsTest.java | 20 +++ 6 files changed, 186 insertions(+), 12 deletions(-) create mode 100644 core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java index d8e538c774..f8a963a8d3 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java @@ -26,6 +26,8 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.util.guava.Maybe; +import com.google.common.annotations.Beta; + /** * This is a Brooklyn extension to the Java {@link Executor}. * @@ -65,6 +67,18 @@ public interface ExecutionContext extends Executor { boolean isShutdown(); + /** + * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available. + * It may throw an error if it cannot be determined whether a value is available immediately or not. + *

+ * Implementations will typically attempt to execute in the current thread, with appropriate + * tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but + * if needed they may block. + *

+ * Supports {@link Callable} and {@link Runnable} targets to be evaluated with "immediate" semantics. + */ + // TODO reference ImmediateSupplier when that class is moved to utils project + @Beta Maybe getImmediately(Object callableOrSupplier); } \ No newline at end of file diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java index 64700defb3..1686f55d9f 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java @@ -103,7 +103,6 @@ public void testRecursiveConfigFailsGracefully() throws Exception { doTestRecursiveConfigFailsGracefully(false); } - // TODO this test fails because entities aren't available when evaluating immediately @Test public void testRecursiveConfigImmediateFailsGracefully() throws Exception { doTestRecursiveConfigFailsGracefully(true); @@ -127,9 +126,9 @@ public void run() { // error, loop wasn't interrupted or detected LOG.warn("Timeout elapsed, destroying items; usage: "+ ((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString()); - //Entities.destroy(app); + Entities.destroy(app); } catch (RuntimeInterruptedException e) { - // expected on normal execution + // expected on normal execution; clear the interrupted flag to prevent ugly further warnings being logged Thread.interrupted(); } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java index ced001e17c..a92a641a78 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -44,7 +44,7 @@ @Beta public class InterruptingImmediateSupplier implements ImmediateSupplier, DeferredSupplier { - final Supplier nestedSupplier; + private final Supplier nestedSupplier; public InterruptingImmediateSupplier(Supplier nestedSupplier) { this.nestedSupplier = nestedSupplier; diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java index 672924f4c1..4882b7c0a8 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java @@ -372,31 +372,39 @@ protected void runGetConfigNonBlockingInMap() throws Exception { @Test(groups="Integration") // because takes 1s+ public void testGetTaskNonBlockingKey() throws Exception { - new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInKey(); } + new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInKey(); + } @Test(groups="Integration") // because takes 1s+ public void testGetTaskNonBlockingMap() throws Exception { - new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInMap(); } + new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInMap(); + } @Test(groups="Integration") // because takes 1s+ public void testGetTaskFactoryNonBlockingKey() throws Exception { - new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); } + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); + } @Test(groups="Integration") // because takes 1s+ public void testGetTaskFactoryNonBlockingMap() throws Exception { - new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInMap(); } + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInMap(); + } @Test(groups="Integration") // because takes 1s+ public void testGetSupplierNonBlockingKey() throws Exception { - new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInKey(); } + new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInKey(); + } @Test(groups="Integration") // because takes 1s+ public void testGetSuppierNonBlockingMap() throws Exception { - new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInMap(); } + new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInMap(); + } @Test // fast public void testGetImmediateSupplierNonBlockingKey() throws Exception { - new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInKey(); } + new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInKey(); + } @Test(groups="Integration") // because takes 1s+ public void testGetImmediateSupplierNonBlockingMap() throws Exception { - new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInMap(); } + new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInMap(); + } @Test public void testGetConfigKeysReturnsFromSuperAndInterfacesAndSubClass() throws Exception { diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java new file mode 100644 index 0000000000..fe83225816 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.core.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.testng.annotations.Test; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.util.concurrent.Callables; +import com.google.common.util.concurrent.Runnables; + +public class InterruptingImmediateSupplierTest { + + @Test(expectedExceptions=UnsupportedOperationException.class) + public void testOfInvalidType() throws Exception { + InterruptingImmediateSupplier.of("myval"); + } + + @Test + public void testRunnable() throws Exception { + assertImmediatelyPresent(Runnables.doNothing(), null); + assertImmediatelyAbsent(new SleepingRunnable()); + assertImmediatelyFails(new FailingRunnable(), MarkerException.class); + } + + @Test + public void testCallable() throws Exception { + assertImmediatelyPresent(Callables.returning("myval"), "myval"); + assertImmediatelyAbsent(new SleepingCallable()); + assertImmediatelyFails(new FailingCallable(), MarkerException.class); + } + + @Test + public void testSupplier() throws Exception { + assertImmediatelyPresent(Suppliers.ofInstance("myval"), "myval"); + assertImmediatelyAbsent(new SleepingSupplier()); + assertImmediatelyFails(new FailingSupplier(), MarkerException.class); + } + + private void assertImmediatelyPresent(Object orig, Object expected) { + Maybe result = getImmediately(orig); + assertEquals(result.get(), expected); + assertFalse(Thread.currentThread().isInterrupted()); + } + + private void assertImmediatelyAbsent(Object orig) { + Maybe result = getImmediately(orig); + assertTrue(result.isAbsent(), "result="+result); + assertFalse(Thread.currentThread().isInterrupted()); + } + + private void assertImmediatelyFails(Object orig, Class expected) { + try { + Maybe result = getImmediately(orig); + Asserts.shouldHaveFailedPreviously("result="+result); + } catch (Exception e) { + Asserts.expectedFailureOfType(e, expected); + } + assertFalse(Thread.currentThread().isInterrupted()); + } + + private Maybe getImmediately(Object val) { + InterruptingImmediateSupplier supplier = InterruptingImmediateSupplier.of(val); + return supplier.getImmediately(); + } + + public static class SleepingRunnable implements Runnable { + @Override public void run() { + Time.sleep(Duration.ONE_MINUTE); + } + } + + public static class SleepingCallable implements Callable { + @Override public Void call() { + Time.sleep(Duration.ONE_MINUTE); + return null; + } + } + + public static class SleepingSupplier implements Supplier { + @Override public Void get() { + Time.sleep(Duration.ONE_MINUTE); + return null; + } + } + + public static class FailingRunnable implements Runnable { + @Override public void run() { + throw new MarkerException(); + } + } + + public static class FailingCallable implements Callable { + @Override public Void call() { + throw new MarkerException(); + } + } + + public static class FailingSupplier implements Supplier { + @Override public Void get() { + throw new MarkerException(); + } + } + + public static class MarkerException extends RuntimeException { + private static final long serialVersionUID = -3395361406478634652L; + } +} \ No newline at end of file diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java b/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java index 65b5d91248..272d3f6df2 100644 --- a/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java +++ b/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java @@ -34,6 +34,7 @@ import org.testng.annotations.Test; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; public class ExceptionsTest { @@ -306,6 +307,25 @@ public void testNestedExecAndProp() { Assert.assertEquals(Exceptions.collapseText(t), "IOException: 1"); } + @Test + public void testGetCausalChain() throws Exception { + Exception e1 = new Exception("e1"); + Exception e2 = new Exception("e2", e1); + assertEquals(Exceptions.getCausalChain(e2), ImmutableList.of(e2, e1)); + } + + @Test + public void testGetCausalChainRecursive() throws Exception { + Exception e1 = new Exception("e1") { + private static final long serialVersionUID = 1L; + public synchronized Throwable getCause() { + return this; + } + }; + Exception e2 = new Exception("e2", e1); + assertEquals(Exceptions.getCausalChain(e2), ImmutableList.of(e2, e1)); + } + @Test public void testComplexJcloudsExample() { Throwable t; From b0733494ce0ef307235ff055bfb0f4815736e876 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Sat, 18 Feb 2017 11:53:11 +0000 Subject: [PATCH 09/15] tweak self-ref check strategy to make idempotent --- .../camp/brooklyn/spi/dsl/methods/DslComponent.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 7d9a95107e..bc93c05e9e 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -553,7 +553,6 @@ public final Maybe getImmediately() { if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe); EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get(); checkAndTagForRecursiveReference(targetEntity); - String keyNameS = resolveKeyName(true); ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS)); @@ -582,6 +581,13 @@ public Object call() throws Exception { private void checkAndTagForRecursiveReference(Entity targetEntity) { String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')"; Task ancestor = Tasks.current(); + if (ancestor!=null) { + // don't check on ourself; only look higher in hierarchy; + // this assumes impls always spawn new tasks (which they do, just maybe not always in new threads) + // but it means it does not rely on tag removal to prevent weird errors, + // and more importantly it makes the strategy idempotent + ancestor = ancestor.getSubmittedByTask(); + } while (ancestor!=null) { if (TaskTags.hasTag(ancestor, tag)) { throw new IllegalStateException("Recursive config reference "+tag); From 0aa29efc6750575427628b025dd84dc8aa8020e6 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Sat, 18 Feb 2017 14:12:12 +0000 Subject: [PATCH 10/15] use a task so tag is always applied in a dedicated context so it doesn't need to be removed the previous attempt at idempotency could set the tag too broadly, e.g. when evaluating K1, then subsequently looking at a K2 that refers to K1, the latter would think it's recursed inside the former --- .../spi/dsl/methods/DslComponent.java | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index bc93c05e9e..1cce90e678 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -549,14 +549,11 @@ protected String resolveKeyName(boolean immediately) { @Override public final Maybe getImmediately() { - Maybe targetEntityMaybe = component.getImmediately(); - if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe); - EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get(); - checkAndTagForRecursiveReference(targetEntity); - String keyNameS = resolveKeyName(true); - ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); - Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS)); - return Maybe.cast(result); + Maybe maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallable(true)); + // the answer will be wrapped twice due to the callable semantics; + // the inner present/absent is important; it will only get an outer absent if interrupted + if (maybeWrappedMaybe.isAbsent()) return maybeWrappedMaybe; + return Maybe.cast( (Maybe) maybeWrappedMaybe.get() ); } @Override @@ -565,17 +562,35 @@ public Task newTask() { .displayName("retrieving config for "+keyName) .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) .dynamic(false) - .body(new Callable() { - @Override - public Object call() throws Exception { - Entity targetEntity = component.get(); - checkAndTagForRecursiveReference(targetEntity); - - String keyNameS = resolveKeyName(true); - ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); - return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS)); - } - }).build(); + .body(newCallable(false)).build(); + } + + private Callable newCallable(final boolean immediate) { + return new Callable() { + @Override + public Object call() throws Exception { + Entity targetEntity; + if (immediate) { + Maybe targetEntityMaybe = component.getImmediately(); + if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe); + targetEntity = (EntityInternal) targetEntityMaybe.get(); + } else { + targetEntity = component.get(); + } + + // this is always run in a new dedicated task (possibly a fake task if immediate), so no need to clear + checkAndTagForRecursiveReference(targetEntity); + + String keyNameS = resolveKeyName(true); + ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); + if (key==null) key = ConfigKeys.newConfigKey(Object.class, keyNameS); + if (immediate) { + return ((EntityInternal)targetEntity).config().getNonBlocking(key); + } else { + return targetEntity.getConfig(key); + } + } + }; } private void checkAndTagForRecursiveReference(Entity targetEntity) { From cd3d4864aa2a59a18f28997313ca07bc9185fd62 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Sat, 18 Feb 2017 16:31:26 +0000 Subject: [PATCH 11/15] ensure TaskFactory items evaluated immediately don't leak long-running tasks --- .../brooklyn/api/mgmt/ExecutionContext.java | 4 +- .../util/core/task/BasicExecutionContext.java | 16 +++- .../task/InterruptingImmediateSupplier.java | 26 ++++-- .../util/core/task/ValueResolver.java | 88 ++++++++++++++----- .../util/core/task/ValueResolverTest.java | 51 +++++++++++ 5 files changed, 151 insertions(+), 34 deletions(-) diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java index f8a963a8d3..344907a45a 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java @@ -75,10 +75,10 @@ public interface ExecutionContext extends Executor { * tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but * if needed they may block. *

- * Supports {@link Callable} and {@link Runnable} targets to be evaluated with "immediate" semantics. + * Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics. */ // TODO reference ImmediateSupplier when that class is moved to utils project @Beta - Maybe getImmediately(Object callableOrSupplier); + Maybe getImmediately(Object callableOrSupplierOrTask); } \ No newline at end of file diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 6c69509eda..0799607331 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -41,6 +41,7 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException; import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,7 +104,20 @@ public ExecutionManager getExecutionManager() { @SuppressWarnings("unchecked") @Override public Maybe getImmediately(Object callableOrSupplier) { - BasicTask fakeTaskForContext = new BasicTask(MutableMap.of("displayName", "immediate evaluation")); + BasicTask fakeTaskForContext; + if (callableOrSupplier instanceof BasicTask) { + fakeTaskForContext = (BasicTask)callableOrSupplier; + if (fakeTaskForContext.isQueuedOrSubmitted()) { + if (fakeTaskForContext.isDone()) { + return Maybe.of((T)fakeTaskForContext.getUnchecked()); + } else { + throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext); + } + } + callableOrSupplier = fakeTaskForContext.getJob(); + } else { + fakeTaskForContext = new BasicTask(MutableMap.of("displayName", "immediate evaluation")); + } fakeTaskForContext.tags.addAll(tags); fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG); fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java index a92a641a78..84b1bb4fee 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.ReferenceWithError; import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.guava.Maybe; @@ -72,12 +73,16 @@ public T get() { return nestedSupplier.get(); } - @SuppressWarnings("unchecked") public static InterruptingImmediateSupplier of(final Object o) { + return InterruptingImmediateSupplier.ofSafe(o).get(); + } + + @SuppressWarnings("unchecked") + public static ReferenceWithError> ofSafe(final Object o) { if (o instanceof Supplier) { - return new InterruptingImmediateSupplier((Supplier)o); + return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier((Supplier)o)); } else if (o instanceof Callable) { - return new InterruptingImmediateSupplier(new Supplier() { + return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier(new Supplier() { @Override public T get() { try { @@ -86,18 +91,25 @@ public T get() { throw Exceptions.propagate(e); } } - }); + })); } else if (o instanceof Runnable) { - return new InterruptingImmediateSupplier(new Supplier() { + return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier(new Supplier() { @Override public T get() { ((Runnable)o).run(); return null; } - }); + })); } else { - throw new UnsupportedOperationException("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")"); + return ReferenceWithError.newInstanceThrowingError(null, new InterruptingImmediateSupplierNotSupportedForObject(o)); } } + public static class InterruptingImmediateSupplierNotSupportedForObject extends UnsupportedOperationException { + private static final long serialVersionUID = 307517409005386500L; + + public InterruptingImmediateSupplierNotSupportedForObject(Object o) { + super("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")"); + } + } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index 6644a9a16b..f8cb91b66c 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -361,37 +361,59 @@ protected Maybe getMaybeInternal() { return Maybe.of((T) v); try { - if (isEvaluatingImmediately() && v instanceof ImmediateSupplier) { - final ImmediateSupplier supplier = (ImmediateSupplier) v; - try { - Maybe result = exec.getImmediately(supplier); - - // Recurse: need to ensure returned value is cast, etc - return (result.isPresent()) - ? recursive - ? new ValueResolver(result.get(), type, this).getMaybe() - : result - : result; - } catch (ImmediateSupplier.ImmediateUnsupportedException e) { - log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e); + boolean allowImmediateExecution = false; + boolean bailOutAfterImmediateExecution = false; + + if (v instanceof ImmediateSupplier) { + allowImmediateExecution = true; + + } else { + if ((v instanceof TaskFactory) && !(v instanceof DeferredSupplier)) { + v = ((TaskFactory)v).newTask(); + allowImmediateExecution = true; + bailOutAfterImmediateExecution = true; + BrooklynTaskTags.setTransient(((TaskAdaptable)v).asTask()); + if (isEvaluatingImmediately()) { + // not needed if executing immediately + BrooklynTaskTags.addTagDynamically( ((TaskAdaptable)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG ); + } + } + + //if it's a task or a future, we wait for the task to complete + if (v instanceof TaskAdaptable) { + v = ((TaskAdaptable) v).asTask(); } } - // TODO if evaluating immediately should use a new ExecutionContext.submitImmediate(...) - // and sets a timeout but which wraps a task but does not spawn a new thread - - if ((v instanceof TaskFactory) && !(v instanceof DeferredSupplier)) { - v = ((TaskFactory)v).newTask(); - BrooklynTaskTags.setTransient(((TaskAdaptable)v).asTask()); - if (isEvaluatingImmediately()) { - BrooklynTaskTags.addTagDynamically( ((TaskAdaptable)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG ); + if (allowImmediateExecution && isEvaluatingImmediately()) { + // TODO could allow for everything, when evaluating immediately -- but if the target isn't safe to run again + // then we have to fail if immediate didn't work; to avoid breaking semantics we only do that for a few cases; + // might be nice to get to the point where we can break those semantics however, + // ie weakening what getImmediate supports and making it be non-blocking, so that bailOut=true is the default. + // if: v instanceof TaskFactory -- it is safe, it's a new API (but it is currently the only one supported); + // more than safe, we have to do it -- or add code here to cancel tasks -- because it spawns new tasks + // (other objects passed through here don't get cancelled, because other things might try again later; + // ie a task or future passed in here might naturally be long-running so cancelling is wrong, + // but with a task factory generated task it would leak if we submitted and didn't cancel!) + // if: v instanceof ImmediateSupplier -- it probably is safe to change to bailOut = true ? + // if: v instanceof Task or other things -- it currently isn't safe, there are places where + // we expect to getImmediate on things which don't support it nicely, + // and we rely on the blocking-short-wait behaviour, e.g. QuorumChecks in ConfigYamlTest + try { + Maybe result = execImmediate(exec, v); + if (result!=null) return result; + if (bailOutAfterImmediateExecution) { + throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v); + } + } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject o) { + // ignore, continue below + log.debug("Unable to resolve-immediately for "+description+" ("+v+", wrong type "+v.getClass()+"); falling back to executing with timeout"); } } - //if it's a task or a future, we wait for the task to complete - if (v instanceof TaskAdaptable) { + if (v instanceof Task) { //if it's a task, we make sure it is submitted - Task task = ((TaskAdaptable) v).asTask(); + Task task = (Task) v; if (!task.isSubmitted()) { if (exec==null) { return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available"); @@ -537,6 +559,24 @@ public Object call() throws Exception { } } + protected Maybe execImmediate(ExecutionContext exec, Object immediateSupplierOrImmediateTask) { + Maybe result; + try { + result = exec.getImmediately(immediateSupplierOrImmediateTask); + } catch (ImmediateSupplier.ImmediateUnsupportedException e) { + return null; + } + // let InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject + // bet thrown, and caller who cares will catch that to know it can continue + + // Recurse: need to ensure returned value is cast, etc + return (result.isPresent()) + ? recursive + ? new ValueResolver(result.get(), type, this).getMaybe() + : result + : result; + } + protected String getDescription() { return description!=null ? description : ""+value; } diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java index 358f39d17c..550d4751af 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java @@ -24,8 +24,11 @@ import java.util.Arrays; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.test.Asserts; @@ -36,6 +39,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.util.concurrent.Callables; + /** * see also {@link TasksTest} for more tests */ @@ -219,6 +224,52 @@ public void testNonRecursiveImmediately() throws Exception { assertEquals(result.getClass(), FailingImmediateAndDeferredSupplier.class); } + public void testTaskFactoryGet() { + TaskFactory> taskFactory = new TaskFactory>() { + @Override public TaskAdaptable newTask() { + return new BasicTask<>(Callables.returning("myval")); + } + }; + String result = Tasks.resolving(taskFactory).as(String.class).context(app).get(); + assertEquals(result, "myval"); + } + + public void testTaskFactoryGetImmediately() { + TaskFactory> taskFactory = new TaskFactory>() { + @Override public TaskAdaptable newTask() { + return new BasicTask<>(Callables.returning("myval")); + } + }; + String result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).get(); + assertEquals(result, "myval"); + } + + public void testTaskFactoryGetImmediatelyDoesNotBlock() { + final AtomicBoolean executing = new AtomicBoolean(); + TaskFactory> taskFactory = new TaskFactory>() { + @Override public TaskAdaptable newTask() { + return new BasicTask<>(new Callable() { + public String call() { + executing.set(true); + try { + Time.sleep(Duration.ONE_MINUTE); + return "myval"; + } finally { + executing.set(false); + } + }}); + } + }; + Maybe result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe(); + Asserts.assertTrue(result.isAbsent(), "result="+result); + // the call below default times out after 30s while the task above is still running + Asserts.succeedsEventually(new Runnable() { + public void run() { + Asserts.assertFalse(executing.get()); + } + }); + } + private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier, DeferredSupplier { private final boolean failImmediately; From 99ccc0f6c8703e924c5fff5197bf1a0c6e39bc81 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Mon, 20 Feb 2017 12:38:48 +0000 Subject: [PATCH 12/15] more assertions for task cancellation --- .../core/entity/EntityConfigTest.java | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java index 4882b7c0a8..2f40fe9c30 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -46,10 +47,12 @@ import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.entity.stock.BasicEntity; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.DeferredSupplier; import org.apache.brooklyn.util.core.task.InterruptingImmediateSupplier; +import org.apache.brooklyn.util.core.task.TaskBuilder; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; @@ -276,6 +279,7 @@ class ConfigNonBlockingFixture { final Semaphore latch = new Semaphore(0); final String expectedVal = "myval"; Object blockingVal; + List> tasksMadeByFactory = MutableList.of(); protected ConfigNonBlockingFixture usingTask() { blockingVal = taskFactory().newTask(); @@ -298,15 +302,22 @@ protected ConfigNonBlockingFixture usingImmediateSupplier() { } private TaskFactory> taskFactory() { - return Tasks.builder().body( + final TaskBuilder tb = Tasks.builder().body( new Callable() { @Override public String call() throws Exception { if (!latch.tryAcquire()) latch.acquire(); latch.release(); return "myval"; - }}) - .buildFactory(); + }}); + return new TaskFactory>() { + @Override + public Task newTask() { + Task t = tb.build(); + tasksMadeByFactory.add(t); + return t; + } + }; } private DeferredSupplier deferredSupplier() { @@ -358,6 +369,14 @@ protected void runGetConfigNonBlockingInMap() throws Exception { // Will initially return absent, because task is not done assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent()); assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent()); + + if (blockingVal instanceof TaskFactory) { + assertAllOurConfigTasksCancelled(); + } else { + // TaskFactory tasks are cancelled, but others are not, + // things (ValueResolver?) are smart enough to know to leave it running + assertAllOurConfigTasksNotCancelled(); + } latch.release(); @@ -367,6 +386,28 @@ protected void runGetConfigNonBlockingInMap() throws Exception { assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal)); assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal); + + assertAllTasksDone(); + } + + private void assertAllOurConfigTasksNotCancelled() { + for (Task t: tasksMadeByFactory) { + Assert.assertFalse( t.isCancelled(), "Task should not have been cancelled: "+t+" - "+t.getStatusDetail(false) ); + } + } + + private void assertAllOurConfigTasksCancelled() { + // TODO added Feb 2017 - but might need an "eventually" here, if cancel is happening in a BG thread + // (but I think it is always foreground) + for (Task t: tasksMadeByFactory) { + Assert.assertTrue( t.isCancelled(), "Task should have been cancelled: "+t+" - "+t.getStatusDetail(false) ); + } + } + + private void assertAllTasksDone() { + for (Task t: tasksMadeByFactory) { + Assert.assertTrue( t.isDone(), "Task should have been done: "+t+" - "+t.getStatusDetail(false) ); + } } } @@ -381,7 +422,7 @@ public void testGetTaskNonBlockingMap() throws Exception { @Test(groups="Integration") // because takes 1s+ public void testGetTaskFactoryNonBlockingKey() throws Exception { - new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); } @Test(groups="Integration") // because takes 1s+ public void testGetTaskFactoryNonBlockingMap() throws Exception { From 2e6f11fae83170f5f5dab8f39bf1a416412f685d Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Mon, 20 Feb 2017 12:41:07 +0000 Subject: [PATCH 13/15] wrap immediate executions in an (entity) execution context --- .../apache/brooklyn/util/core/task/BasicExecutionContext.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 0799607331..a3ea321762 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -123,6 +123,9 @@ public Maybe getImmediately(Object callableOrSupplier) { fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); Task previousTask = BasicExecutionManager.getPerThreadCurrentTask().get(); + BasicExecutionContext oldExecutionContext = getCurrentExecutionContext(); + registerPerThreadExecutionContext(); + if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask); fakeTaskForContext.cancel(); try { @@ -135,6 +138,7 @@ public Maybe getImmediately(Object callableOrSupplier) { } finally { BasicExecutionManager.getPerThreadCurrentTask().set(previousTask); + perThreadExecutionContext.set(oldExecutionContext); } } From 46021142e7b063ca8b6da9690cdb27280e6e491c Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Tue, 28 Feb 2017 17:13:15 +0000 Subject: [PATCH 14/15] fixes and comments from code review --- .../spi/dsl/methods/DslComponent.java | 6 +-- .../util/core/task/BasicExecutionContext.java | 8 +++- .../brooklyn/util/core/task/CompoundTask.java | 15 ++++++ .../task/InterruptingImmediateSupplier.java | 4 +- .../util/core/task/ValueResolver.java | 24 +++++----- .../util/core/task/ValueResolverTest.java | 46 +++++++++++++++++++ 6 files changed, 87 insertions(+), 16 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 1cce90e678..867c108dd3 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -549,7 +549,7 @@ protected String resolveKeyName(boolean immediately) { @Override public final Maybe getImmediately() { - Maybe maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallable(true)); + Maybe maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallableReturningImmediateMaybeOrNonImmediateValue(true)); // the answer will be wrapped twice due to the callable semantics; // the inner present/absent is important; it will only get an outer absent if interrupted if (maybeWrappedMaybe.isAbsent()) return maybeWrappedMaybe; @@ -562,10 +562,10 @@ public Task newTask() { .displayName("retrieving config for "+keyName) .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) .dynamic(false) - .body(newCallable(false)).build(); + .body(newCallableReturningImmediateMaybeOrNonImmediateValue(false)).build(); } - private Callable newCallable(final boolean immediate) { + private Callable newCallableReturningImmediateMaybeOrNonImmediateValue(final boolean immediate) { return new Callable() { @Override public Object call() throws Exception { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index a3ea321762..2ff4dc820f 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -28,6 +28,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ExecutionContext; @@ -100,7 +101,12 @@ public ExecutionManager getExecutionManager() { public Set> getTasks() { return executionManager.getTasksWithAllTags(tags); } /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context; - * currently supports suppliers or callables */ + * currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances; + * with tasks if it is submitted or in progress, + * it fails if not completed; with unsubmitted, unqueued tasks, it gets the {@link Callable} job and + * uses that; with such a job, or any other callable/supplier/runnable, it runs that + * in an {@link InterruptingImmediateSupplier}, with as much metadata as possible (eg task name if + * given a task) set temporarily in the current thread context */ @SuppressWarnings("unchecked") @Override public Maybe getImmediately(Object callableOrSupplier) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java index 72dfb442f4..db1722981e 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java @@ -128,4 +128,19 @@ public List> getChildren() { return (List) getChildrenTyped(); } + @Override + protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) { + boolean result = false; + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { + for (Task t: getChildren()) { + if (!t.isDone()) { + result = ((TaskInternal)t).cancel(mode) || result; + } + } + } + result = super.doCancel(mode) || result; + return result; + // returns true if anything is successfully cancelled + } + } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java index 84b1bb4fee..afbc2853ee 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.util.core.task; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.Semaphore; import org.apache.brooklyn.util.exceptions.Exceptions; @@ -59,7 +60,8 @@ public Maybe getImmediately() { return Maybe.ofAllowingNull(get()); } catch (Throwable t) { if (Exceptions.getFirstThrowableOfType(t, InterruptedException.class)!=null || - Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null) { + Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null || + Exceptions.getFirstThrowableOfType(t, CancellationException.class)!=null) { return Maybe.absent(new UnsupportedOperationException("Immediate value not available", t)); } throw Exceptions.propagate(t); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index f8cb91b66c..3c6d96b14e 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -405,9 +405,14 @@ protected Maybe getMaybeInternal() { if (bailOutAfterImmediateExecution) { throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v); } - } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject o) { + } catch (ImmediateSupplier.ImmediateUnsupportedException e) { + if (bailOutAfterImmediateExecution) { + throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v, e); + } + log.debug("Unable to resolve-immediately for "+description+" ("+v+", unsupported, type "+v.getClass()+"); falling back to executing with timeout: "+e); + } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject e) { // ignore, continue below - log.debug("Unable to resolve-immediately for "+description+" ("+v+", wrong type "+v.getClass()+"); falling back to executing with timeout"); + log.debug("Unable to resolve-immediately for "+description+" ("+v+", not supported for type "+v.getClass()+"); falling back to executing with timeout: "+e); } } @@ -559,17 +564,14 @@ public Object call() throws Exception { } } + /** tries to get immediately, then resolve recursively (including for casting) if {@link #recursive} is set + * + * @throws InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject + * ImmediateSupplier.ImmediateUnsupportedException + * if underlying call to {@link ExecutionContext#getImmediately(Object)} does so */ protected Maybe execImmediate(ExecutionContext exec, Object immediateSupplierOrImmediateTask) { - Maybe result; - try { - result = exec.getImmediately(immediateSupplierOrImmediateTask); - } catch (ImmediateSupplier.ImmediateUnsupportedException e) { - return null; - } - // let InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject - // bet thrown, and caller who cares will catch that to know it can continue + Maybe result = exec.getImmediately(immediateSupplierOrImmediateTask); - // Recurse: need to ensure returned value is cast, etc return (result.isPresent()) ? recursive ? new ValueResolver(result.get(), type, this).getMaybe() diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java index 550d4751af..64cb024439 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java @@ -23,8 +23,10 @@ import static org.testng.Assert.fail; import java.util.Arrays; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; @@ -39,6 +41,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Callables; /** @@ -270,6 +274,48 @@ public void run() { }); } + public void testTaskFactoryGetImmediatelyDoesNotBlockWithNestedTasks() { + final int NUM_CALLS = 3; + final AtomicInteger executingCount = new AtomicInteger(); + final List> outerTasks = Lists.newArrayList(); + + TaskFactory> taskFactory = new TaskFactory>() { + @Override public Task newTask() { + SequentialTask result = new SequentialTask<>(ImmutableList.of(new Callable() { + public String call() { + executingCount.incrementAndGet(); + try { + Time.sleep(Duration.ONE_MINUTE); + return "myval"; + } finally { + executingCount.decrementAndGet(); + } + }})); + outerTasks.add(result); + return result; + } + }; + for (int i = 0; i < NUM_CALLS; i++) { + Maybe result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe(); + Asserts.assertTrue(result.isAbsent(), "result="+result); + } + // the call below default times out after 30s while the task above is still running + Asserts.succeedsEventually(new Runnable() { + public void run() { + Asserts.assertEquals(outerTasks.size(), NUM_CALLS); + for (Task task : outerTasks) { + Asserts.assertTrue(task.isDone()); + Asserts.assertTrue(task.isCancelled()); + } + } + }); + Asserts.succeedsEventually(new Runnable() { + public void run() { + Asserts.assertEquals(executingCount.get(), 0); + } + }); + } + private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier, DeferredSupplier { private final boolean failImmediately; From e6fd10c35ee34910c9a1a0fd8748ade66b5d32f5 Mon Sep 17 00:00:00 2001 From: Alex Heneveld Date: Thu, 2 Mar 2017 12:15:39 +0000 Subject: [PATCH 15/15] fix import (wrong Supplier) --- .../apache/brooklyn/util/core/task/BasicExecutionContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 2ff4dc820f..f35a68af4c 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -28,7 +28,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ExecutionContext; @@ -48,6 +47,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; /**