diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java index 4540240ec3..344907a45a 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java @@ -24,6 +24,9 @@ import java.util.concurrent.Executor; import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.util.guava.Maybe; + +import com.google.common.annotations.Beta; /** * This is a Brooklyn extension to the Java {@link Executor}. @@ -64,4 +67,18 @@ public interface ExecutionContext extends Executor { boolean isShutdown(); + /** + * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available. + * It may throw an error if it cannot be determined whether a value is available immediately or not. + *

+ * Implementations will typically attempt to execute in the current thread, with appropriate + * tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but + * if needed they may block. + *

+ * Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics. + */ + // TODO reference ImmediateSupplier when that class is moved to utils project + @Beta + Maybe getImmediately(Object callableOrSupplierOrTask); + } \ No newline at end of file diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java index 9de33405b7..867c108dd3 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java @@ -48,6 +48,7 @@ import org.apache.brooklyn.util.core.task.DeferredSupplier; import org.apache.brooklyn.util.core.task.ImmediateSupplier; import org.apache.brooklyn.util.core.task.TaskBuilder; +import org.apache.brooklyn.util.core.task.TaskTags; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.groovy.GroovyJavaMethods; @@ -206,6 +207,15 @@ public Maybe getImmediately() { } } + @Override + public Entity get() { + try { + return call(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + @Override public Entity call() throws Exception { return callImpl(false).get(); @@ -219,7 +229,7 @@ protected Maybe getEntity(boolean immediate) { return Maybe.of(scopeComponent.get()); } } else { - return Maybe.of(entity()); + return Maybe.ofDisallowingNull(entity()).or(Maybe.absent("Context entity not available when trying to evaluate Brooklyn DSL")); } } @@ -311,10 +321,11 @@ protected Maybe callImpl(boolean immediate) throws Exception { return Maybe.of(result.get()); } - // TODO may want to block and repeat on new entities joining? - throw new NoSuchElementException("No entity matching id " + desiredComponentId+ + // could be nice if DSL has an extra .block() method to allow it to wait for a matching entity. + // previously we threw if nothing existed; now we return an absent with a detailed error + return Maybe.absent(new NoSuchElementException("No entity matching id " + desiredComponentId+ (scope==Scope.GLOBAL ? "" : ", in scope "+scope+" wrt "+entity+ - (scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : ""))); + (scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : "")))); } private ExecutionContext getExecutionContext() { @@ -538,14 +549,11 @@ protected String resolveKeyName(boolean immediately) { @Override public final Maybe getImmediately() { - Maybe targetEntityMaybe = component.getImmediately(); - if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available"); - EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get(); - - String keyNameS = resolveKeyName(true); - ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); - Maybe result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS)); - return Maybe.cast(result); + Maybe maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallableReturningImmediateMaybeOrNonImmediateValue(true)); + // the answer will be wrapped twice due to the callable semantics; + // the inner present/absent is important; it will only get an outer absent if interrupted + if (maybeWrappedMaybe.isAbsent()) return maybeWrappedMaybe; + return Maybe.cast( (Maybe) maybeWrappedMaybe.get() ); } @Override @@ -554,15 +562,55 @@ public Task newTask() { .displayName("retrieving config for "+keyName) .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) .dynamic(false) - .body(new Callable() { - @Override - public Object call() throws Exception { - Entity targetEntity = component.get(); - String keyNameS = resolveKeyName(true); - ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); - return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS)); - }}) - .build(); + .body(newCallableReturningImmediateMaybeOrNonImmediateValue(false)).build(); + } + + private Callable newCallableReturningImmediateMaybeOrNonImmediateValue(final boolean immediate) { + return new Callable() { + @Override + public Object call() throws Exception { + Entity targetEntity; + if (immediate) { + Maybe targetEntityMaybe = component.getImmediately(); + if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe); + targetEntity = (EntityInternal) targetEntityMaybe.get(); + } else { + targetEntity = component.get(); + } + + // this is always run in a new dedicated task (possibly a fake task if immediate), so no need to clear + checkAndTagForRecursiveReference(targetEntity); + + String keyNameS = resolveKeyName(true); + ConfigKey key = targetEntity.getEntityType().getConfigKey(keyNameS); + if (key==null) key = ConfigKeys.newConfigKey(Object.class, keyNameS); + if (immediate) { + return ((EntityInternal)targetEntity).config().getNonBlocking(key); + } else { + return targetEntity.getConfig(key); + } + } + }; + } + + private void checkAndTagForRecursiveReference(Entity targetEntity) { + String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')"; + Task ancestor = Tasks.current(); + if (ancestor!=null) { + // don't check on ourself; only look higher in hierarchy; + // this assumes impls always spawn new tasks (which they do, just maybe not always in new threads) + // but it means it does not rely on tag removal to prevent weird errors, + // and more importantly it makes the strategy idempotent + ancestor = ancestor.getSubmittedByTask(); + } + while (ancestor!=null) { + if (TaskTags.hasTag(ancestor, tag)) { + throw new IllegalStateException("Recursive config reference "+tag); + } + ancestor = ancestor.getSubmittedByTask(); + } + + Tasks.addTagDynamically(tag); } @Override diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java index 07fc36a85e..1686f55d9f 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java @@ -28,8 +28,14 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.core.config.ConfigKeys; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -44,7 +50,6 @@ public class ConfigYamlTest extends AbstractYamlTest { - @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(ConfigYamlTest.class); private ExecutorService executor; @@ -91,6 +96,61 @@ public void testConfigInConfigBlock() throws Exception { assertNull(entity.getMyField()); // field with @SetFromFlag assertNull(entity.getMyField2()); // field with @SetFromFlag("myField2Alias"), set using alias } + + + @Test + public void testRecursiveConfigFailsGracefully() throws Exception { + doTestRecursiveConfigFailsGracefully(false); + } + + @Test + public void testRecursiveConfigImmediateFailsGracefully() throws Exception { + doTestRecursiveConfigFailsGracefully(true); + } + + protected void doTestRecursiveConfigFailsGracefully(boolean immediate) throws Exception { + String yaml = Joiner.on("\n").join( + "services:", + "- type: org.apache.brooklyn.core.test.entity.TestEntity", + " brooklyn.config:", + " infinite_loop: $brooklyn:config(\"infinite_loop\")"); + + final Entity app = createStartWaitAndLogApplication(yaml); + TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren()); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + Time.sleep(Duration.FIVE_SECONDS); + // error, loop wasn't interrupted or detected + LOG.warn("Timeout elapsed, destroying items; usage: "+ + ((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString()); + Entities.destroy(app); + } catch (RuntimeInterruptedException e) { + // expected on normal execution; clear the interrupted flag to prevent ugly further warnings being logged + Thread.interrupted(); + } + } + }); + t.start(); + try { + String c; + if (immediate) { + // this should throw rather than return "absent", because the error is definitive (absent means couldn't resolve in time) + c = entity.config().getNonBlocking(ConfigKeys.newStringConfigKey("infinite_loop")).or("FAILED"); + } else { + c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop")); + } + Asserts.shouldHaveFailedPreviously("Expected recursive error, instead got: "+c); + } catch (Exception e) { + Asserts.expectedFailureContainsIgnoreCase(e, "infinite_loop", "recursive"); + } finally { + if (!Entities.isManaged(app)) { + t.interrupt(); + } + } + } @Test public void testConfigAtTopLevel() throws Exception { diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java index d387920f37..63aba8e84d 100644 --- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java @@ -48,6 +48,7 @@ import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.text.Identifiers; import org.apache.brooklyn.util.time.Duration; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -296,14 +297,13 @@ public void testUrlEncode() throws Exception { @Test public void testEntityNotFound() throws Exception { BrooklynDslDeferredSupplier dsl = BrooklynDslCommon.entity("myIdDoesNotExist"); + Maybe actualValue = execDslImmediately(dsl, Entity.class, app, true); + Assert.assertTrue(actualValue.isAbsent()); try { - Maybe actualValue = execDslImmediately(dsl, Entity.class, app, true); + actualValue.get(); Asserts.shouldHaveFailedPreviously("actual="+actualValue); } catch (Exception e) { - NoSuchElementException nsee = Exceptions.getFirstThrowableOfType(e, NoSuchElementException.class); - if (nsee == null) { - throw e; - } + Asserts.expectedFailureOfType(e, NoSuchElementException.class); } } @@ -365,7 +365,7 @@ public DslTestWorker satisfiedAsynchronously(boolean val) { return this; } - @SuppressWarnings("unused") // included for completeness? + @SuppressWarnings("unused") // kept in case useful for additional tests, for completeness public DslTestWorker wrapInTaskForImmediately(boolean val) { wrapInTaskForImmediately = val; return this; diff --git a/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java b/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java index 2d92617516..b736beb27e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java +++ b/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import org.apache.brooklyn.api.mgmt.ExecutionContext; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.config.ConfigInheritance; import org.apache.brooklyn.config.ConfigInheritances; @@ -231,7 +232,7 @@ public Maybe getConfigRaw(ConfigKey key, boolean includeInherited) { } protected Object coerceConfigVal(ConfigKey key, Object v) { - if ((v instanceof Future) || (v instanceof DeferredSupplier)) { + if ((v instanceof Future) || (v instanceof DeferredSupplier) || (v instanceof TaskFactory)) { // no coercion for these (coerce on exit) return v; } else if (key instanceof StructuredConfigKey) { diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java index 2280be7e7b..7d721d04a3 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java @@ -80,6 +80,8 @@ public class BrooklynTaskTags extends TaskTags { * and that it need not appear in some task lists; * often used for framework lifecycle events and sensor polling */ public static final String TRANSIENT_TASK_TAG = "TRANSIENT"; + /** marks that a task is meant to return immediately, without blocking (or if absolutely necessary blocking for a short while) */ + public static final String IMMEDIATE_TASK_TAG = "IMMEDIATE"; // ------------- entity tags ------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java index ce10c86957..796ab137da 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java @@ -146,7 +146,6 @@ protected Maybe getNonBlockingResolvingSimple(ConfigKey key) { .immediately(true) .deep(true) .context(getContext()) - .swallowExceptions() .get(); return (resolved != marker) ? TypeCoercions.tryCoerce(resolved, key.getTypeToken()) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index 38f1b5a4ce..f35a68af4c 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -41,10 +41,13 @@ import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity; import org.apache.brooklyn.core.mgmt.entitlement.Entitlements; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException; +import org.apache.brooklyn.util.guava.Maybe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; /** @@ -96,7 +99,55 @@ public ExecutionManager getExecutionManager() { /** returns tasks started by this context (or tasks which have all the tags on this object) */ @Override public Set> getTasks() { return executionManager.getTasksWithAllTags(tags); } - + + /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context; + * currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances; + * with tasks if it is submitted or in progress, + * it fails if not completed; with unsubmitted, unqueued tasks, it gets the {@link Callable} job and + * uses that; with such a job, or any other callable/supplier/runnable, it runs that + * in an {@link InterruptingImmediateSupplier}, with as much metadata as possible (eg task name if + * given a task) set temporarily in the current thread context */ + @SuppressWarnings("unchecked") + @Override + public Maybe getImmediately(Object callableOrSupplier) { + BasicTask fakeTaskForContext; + if (callableOrSupplier instanceof BasicTask) { + fakeTaskForContext = (BasicTask)callableOrSupplier; + if (fakeTaskForContext.isQueuedOrSubmitted()) { + if (fakeTaskForContext.isDone()) { + return Maybe.of((T)fakeTaskForContext.getUnchecked()); + } else { + throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext); + } + } + callableOrSupplier = fakeTaskForContext.getJob(); + } else { + fakeTaskForContext = new BasicTask(MutableMap.of("displayName", "immediate evaluation")); + } + fakeTaskForContext.tags.addAll(tags); + fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG); + fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG); + + Task previousTask = BasicExecutionManager.getPerThreadCurrentTask().get(); + BasicExecutionContext oldExecutionContext = getCurrentExecutionContext(); + registerPerThreadExecutionContext(); + + if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask); + fakeTaskForContext.cancel(); + try { + BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext); + + if (!(callableOrSupplier instanceof ImmediateSupplier)) { + callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier); + } + return ((ImmediateSupplier)callableOrSupplier).getImmediately(); + + } finally { + BasicExecutionManager.getPerThreadCurrentTask().set(previousTask); + perThreadExecutionContext.set(oldExecutionContext); + } + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override protected Task submitInternal(Map propertiesQ, final Object task) { diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index 51f1b67a6a..8b59498907 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -591,7 +591,7 @@ public boolean cancel(TaskCancellationMode mode) { if (!task.isCancelled()) result |= ((TaskInternal)task).cancel(mode); result |= delegate().cancel(mode.isAllowedToInterruptTask()); - if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) { + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { int subtasksFound=0; int subtasksReallyCancelled=0; @@ -753,7 +753,10 @@ protected void beforeStartAtomicTask(Map flags, Task task) { /** invoked in a task's thread when a task is starting to run (may be some time after submitted), * but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */ protected void internalBeforeStart(Map flags, Task task) { - activeTaskCount.incrementAndGet(); + int count = activeTaskCount.incrementAndGet(); + if (count % 1000==0) { + log.warn("High number of active tasks: task #"+count+" is "+task); + } //set thread _before_ start time, so we won't get a null thread when there is a start-time if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task + " running on thread " + Thread.currentThread().getName()); diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java index 72dfb442f4..db1722981e 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java @@ -128,4 +128,19 @@ public List> getChildren() { return (List) getChildrenTyped(); } + @Override + protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) { + boolean result = false; + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { + for (Task t: getChildren()) { + if (!t.isDone()) { + result = ((TaskInternal)t).cancel(mode) || result; + } + } + } + result = super.doCancel(mode) || result; + return result; + // returns true if anything is successfully cancelled + } + } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java index 1b421b0b25..2869ff98de 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java @@ -160,9 +160,11 @@ public void queue(Task t) { @Override protected boolean doCancel(TaskCancellationMode mode) { boolean result = false; - if (mode.isAllowedToInterruptDependentSubmittedTasks() || mode.isAllowedToInterruptAllSubmittedTasks()) { - for (Task t: secondaryJobsAll) + if (mode.isAllowedToInterruptDependentSubmittedTasks()) { + for (Task t: secondaryJobsAll) { + // secondary jobs are dependent result = ((TaskInternal)t).cancel(mode) || result; + } } return super.doCancel(mode) || result; // returns true if anything is successfully cancelled diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java index ac0aae4e4e..5ec8d68d1b 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java @@ -20,14 +20,17 @@ import org.apache.brooklyn.util.guava.Maybe; +import com.google.common.base.Supplier; + /** - * A class that supplies objects of a single type, without blocking for any significant length - * of time. + * A {@link Supplier} that has an extra method capable of supplying a value immediately or an absent if definitely not available, + * or throwing an {@link ImmediateUnsupportedException} if it cannot determine whether a value is immediately available. */ -public interface ImmediateSupplier { +public interface ImmediateSupplier extends Supplier { /** - * Indicates that we are unable to get the value immediately, because that is not supported + * Indicates that a supplier does not support immediate evaluation, + * i.e. it may need to block to evaluate even if there is a value available * (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}. */ public static class ImmediateUnsupportedException extends UnsupportedOperationException { @@ -44,7 +47,7 @@ public ImmediateUnsupportedException(String message, Throwable cause) { /** * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available. * - * @throws ImmediateUnsupportedException if cannot determinte the value immediately + * @throws ImmediateUnsupportedException if cannot determine whether a value is immediately available */ Maybe getImmediately(); } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java new file mode 100644 index 0000000000..afbc2853ee --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.core.task; + +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Semaphore; + +import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.ReferenceWithError; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; +import org.apache.brooklyn.util.guava.Maybe; + +import com.google.common.annotations.Beta; +import com.google.common.base.Supplier; + +/** + * Wraps a {@link Supplier} as an {@link ImmediateSupplier} by interrupting the thread before calling {@link Supplier#get()}. + * If the call succeeds, the result is returned. + * If the call throws any trace including an {@link InterruptedException} or {@link RuntimeInterruptedException} + * (ie the call failed due to the interruption, typically because it tried to wait) + * then this class concludes that there is no value available immediately and returns {@link Maybe#absent()}. + * If the call throws any other error, that is returned. + * The interruption is cleared afterwards (unless the thread was interrupted when the method was entered). + *

+ * Note that some "immediate" methods, such as {@link Semaphore#acquire()} when a semaphore is available, + * will throw if the thread is interrupted. Typically there are workarounds, for instance: + * if (semaphore.tryAcquire()) semaphore.acquire();. + */ +@Beta +public class InterruptingImmediateSupplier implements ImmediateSupplier, DeferredSupplier { + + private final Supplier nestedSupplier; + + public InterruptingImmediateSupplier(Supplier nestedSupplier) { + this.nestedSupplier = nestedSupplier; + } + + @Override + public Maybe getImmediately() { + boolean interrupted = Thread.currentThread().isInterrupted(); + try { + if (!interrupted) Thread.currentThread().interrupt(); + return Maybe.ofAllowingNull(get()); + } catch (Throwable t) { + if (Exceptions.getFirstThrowableOfType(t, InterruptedException.class)!=null || + Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null || + Exceptions.getFirstThrowableOfType(t, CancellationException.class)!=null) { + return Maybe.absent(new UnsupportedOperationException("Immediate value not available", t)); + } + throw Exceptions.propagate(t); + } finally { + if (!interrupted) Thread.interrupted(); + } + } + + @Override + public T get() { + return nestedSupplier.get(); + } + + public static InterruptingImmediateSupplier of(final Object o) { + return InterruptingImmediateSupplier.ofSafe(o).get(); + } + + @SuppressWarnings("unchecked") + public static ReferenceWithError> ofSafe(final Object o) { + if (o instanceof Supplier) { + return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier((Supplier)o)); + } else if (o instanceof Callable) { + return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier(new Supplier() { + @Override + public T get() { + try { + return ((Callable)o).call(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + })); + } else if (o instanceof Runnable) { + return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier(new Supplier() { + @Override + public T get() { + ((Runnable)o).run(); + return null; + } + })); + } else { + return ReferenceWithError.newInstanceThrowingError(null, new InterruptingImmediateSupplierNotSupportedForObject(o)); + } + } + + public static class InterruptingImmediateSupplierNotSupportedForObject extends UnsupportedOperationException { + private static final long serialVersionUID = 307517409005386500L; + + public InterruptingImmediateSupplierNotSupportedForObject(Object o) { + super("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")"); + } + } +} diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java index 99c2773d64..f565aa06b3 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java @@ -143,6 +143,9 @@ private TaskCancellationMode(boolean mayInterruptIfRunning, boolean interruptSub this.allowedToInterruptTask = mayInterruptIfRunning; this.allowedToInterruptDependentSubmittedTasks = interruptSubmittedTransients; this.allowedToInterruptAllSubmittedTasks = interruptAllSubmitted; + + // if dependent isn't set, then all shouldn't be set + assert !(this.allowedToInterruptAllSubmittedTasks && !this.allowedToInterruptDependentSubmittedTasks); } public boolean isAllowedToInterruptTask() { return allowedToInterruptTask; } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java index 6b64a6b0f6..4319796810 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java @@ -62,6 +62,7 @@ public static boolean isInessential(Task task) { } public static boolean hasTag(Task task, Object tag) { + if (task==null) return false; return task.getTags().contains(tag); } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java index f81594eba8..3c6d96b14e 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java @@ -29,6 +29,7 @@ import org.apache.brooklyn.api.mgmt.ExecutionContext; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.util.core.flags.TypeCoercions; @@ -322,6 +323,10 @@ public Maybe getMaybe() { return result; } + protected boolean isEvaluatingImmediately() { + return immediately || BrooklynTaskTags.hasTag(Tasks.current(), BrooklynTaskTags.IMMEDIATE_TASK_TAG); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) protected Maybe getMaybeInternal() { if (started.getAndSet(true)) @@ -352,33 +357,78 @@ protected Maybe getMaybeInternal() { //if the expected type is a closure or map and that's what we have, we're done (or if it's null); //but not allowed to return a future or DeferredSupplier as the resolved value - if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v))) + if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v) && !TaskFactory.class.isInstance(v))) return Maybe.of((T) v); try { - if (immediately && v instanceof ImmediateSupplier) { - final ImmediateSupplier supplier = (ImmediateSupplier) v; + boolean allowImmediateExecution = false; + boolean bailOutAfterImmediateExecution = false; + + if (v instanceof ImmediateSupplier) { + allowImmediateExecution = true; + + } else { + if ((v instanceof TaskFactory) && !(v instanceof DeferredSupplier)) { + v = ((TaskFactory)v).newTask(); + allowImmediateExecution = true; + bailOutAfterImmediateExecution = true; + BrooklynTaskTags.setTransient(((TaskAdaptable)v).asTask()); + if (isEvaluatingImmediately()) { + // not needed if executing immediately + BrooklynTaskTags.addTagDynamically( ((TaskAdaptable)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG ); + } + } + + //if it's a task or a future, we wait for the task to complete + if (v instanceof TaskAdaptable) { + v = ((TaskAdaptable) v).asTask(); + } + } + + if (allowImmediateExecution && isEvaluatingImmediately()) { + // TODO could allow for everything, when evaluating immediately -- but if the target isn't safe to run again + // then we have to fail if immediate didn't work; to avoid breaking semantics we only do that for a few cases; + // might be nice to get to the point where we can break those semantics however, + // ie weakening what getImmediate supports and making it be non-blocking, so that bailOut=true is the default. + // if: v instanceof TaskFactory -- it is safe, it's a new API (but it is currently the only one supported); + // more than safe, we have to do it -- or add code here to cancel tasks -- because it spawns new tasks + // (other objects passed through here don't get cancelled, because other things might try again later; + // ie a task or future passed in here might naturally be long-running so cancelling is wrong, + // but with a task factory generated task it would leak if we submitted and didn't cancel!) + // if: v instanceof ImmediateSupplier -- it probably is safe to change to bailOut = true ? + // if: v instanceof Task or other things -- it currently isn't safe, there are places where + // we expect to getImmediate on things which don't support it nicely, + // and we rely on the blocking-short-wait behaviour, e.g. QuorumChecks in ConfigYamlTest try { - Maybe result = supplier.getImmediately(); - - // Recurse: need to ensure returned value is cast, etc - return (result.isPresent()) - ? recursive - ? new ValueResolver(result.get(), type, this).getMaybe() - : result - : Maybe.absent(); + Maybe result = execImmediate(exec, v); + if (result!=null) return result; + if (bailOutAfterImmediateExecution) { + throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v); + } } catch (ImmediateSupplier.ImmediateUnsupportedException e) { - log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e); + if (bailOutAfterImmediateExecution) { + throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v, e); + } + log.debug("Unable to resolve-immediately for "+description+" ("+v+", unsupported, type "+v.getClass()+"); falling back to executing with timeout: "+e); + } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject e) { + // ignore, continue below + log.debug("Unable to resolve-immediately for "+description+" ("+v+", not supported for type "+v.getClass()+"); falling back to executing with timeout: "+e); } } - //if it's a task or a future, we wait for the task to complete - if (v instanceof TaskAdaptable) { + if (v instanceof Task) { //if it's a task, we make sure it is submitted - if (!((TaskAdaptable) v).asTask().isSubmitted() ) { - if (exec==null) + Task task = (Task) v; + if (!task.isSubmitted()) { + if (exec==null) { return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available"); - exec.submit(((TaskAdaptable) v).asTask()); + } + if (!task.getTags().contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) { + // mark this non-transient, because this value is usually something set e.g. in config + // (should discourage this in favour of task factories which can be transiently interrupted?) + BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG); + } + exec.submit(task); } } @@ -493,7 +543,9 @@ public Object call() throws Exception { } catch (Exception e) { Exceptions.propagateIfFatal(e); - IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e); + String msg = "Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec; + String eTxt = Exceptions.collapseText(e); + IllegalArgumentException problem = eTxt.startsWith(msg) ? new IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e); if (swallowExceptions) { if (log.isDebugEnabled()) log.debug("Resolution of "+this+" failed, swallowing and returning: "+e); @@ -512,6 +564,21 @@ public Object call() throws Exception { } } + /** tries to get immediately, then resolve recursively (including for casting) if {@link #recursive} is set + * + * @throws InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject + * ImmediateSupplier.ImmediateUnsupportedException + * if underlying call to {@link ExecutionContext#getImmediately(Object)} does so */ + protected Maybe execImmediate(ExecutionContext exec, Object immediateSupplierOrImmediateTask) { + Maybe result = exec.getImmediately(immediateSupplierOrImmediateTask); + + return (result.isPresent()) + ? recursive + ? new ValueResolver(result.get(), type, this).getMaybe() + : result + : result; + } + protected String getDescription() { return description!=null ? description : ""+value; } diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java index 38f5f90301..2f40fe9c30 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java @@ -22,12 +22,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -36,6 +38,7 @@ import org.apache.brooklyn.api.entity.ImplementedBy; import org.apache.brooklyn.api.mgmt.ExecutionManager; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.config.ConfigKey; import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.config.ConfigPredicates; @@ -44,16 +47,22 @@ import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.entity.stock.BasicEntity; import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.flags.SetFromFlag; import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.DeferredSupplier; +import org.apache.brooklyn.util.core.task.InterruptingImmediateSupplier; +import org.apache.brooklyn.util.core.task.TaskBuilder; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -63,6 +72,8 @@ public class EntityConfigTest extends BrooklynAppUnitTestSupport { + private static final Logger log = LoggerFactory.getLogger(EntityConfigTest.class); + private static final int TIMEOUT_MS = 10*1000; private ExecutorService executor; @@ -244,57 +255,196 @@ public void testGetConfigMapWithSubValueAsStringNotCoerced() throws Exception { // of the previous "test.confMapThing.obj". // // Presumably an earlier call to task.get() timed out, causing it to cancel the task? + // Alex: yes, a task.cancel is performed for maps in + // AbstractEntity$BasicConfigurationSupport(AbstractConfigurationSupportInternal).getNonBlockingResolvingStructuredKey(ConfigKey) + + // // I (Aled) question whether we want to support passing a task (rather than a // DeferredSupplier or TaskFactory, for example). Our EntitySpec.configure is overloaded // to take a Task, but that feels wrong!? - @Test(groups="Broken") - public void testGetTaskNonBlocking() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - Task task = Tasks.builder().body( + // + // If starting clean I (Alex) would agree, we should use TaskFactory. However the + // DependentConfiguration methods -- including the ubiquitous AttributeWhenReady -- + // return Task instances so they should survive a getNonBlocking or get with a short timeout + // access, and if a value is subsequently available it should be returned + // (which this test asserts, but is currently failing). If TaskFactory is used the + // intended semantics are clear -- you create a new task on each access, and can interrupt it + // and discard it if needed. For a Task it's less clear: probably the semantics are that the + // first returned value is what the value is forevermore. Probably it should not be interrupted + // on a non-blocking / short-wait access, or possibly it should simply be re-run if a previous + // execution was interrupted (but take care if we have a simultaneous non-blocking and blocking + // access, if the first one interrupts the second one should still get a value). + // I tend to think ideally we should switch to using TaskFactory in DependentConfiguration. + class ConfigNonBlockingFixture { + final Semaphore latch = new Semaphore(0); + final String expectedVal = "myval"; + Object blockingVal; + List> tasksMadeByFactory = MutableList.of(); + + protected ConfigNonBlockingFixture usingTask() { + blockingVal = taskFactory().newTask(); + return this; + } + + protected ConfigNonBlockingFixture usingTaskFactory() { + blockingVal = taskFactory(); + return this; + } + + protected ConfigNonBlockingFixture usingDeferredSupplier() { + blockingVal = deferredSupplier(); + return this; + } + + protected ConfigNonBlockingFixture usingImmediateSupplier() { + blockingVal = new InterruptingImmediateSupplier(deferredSupplier()); + return this; + } + + private TaskFactory> taskFactory() { + final TaskBuilder tb = Tasks.builder().body( new Callable() { @Override public String call() throws Exception { - latch.await(); + if (!latch.tryAcquire()) latch.acquire(); + latch.release(); return "myval"; - }}) - .build(); - runGetConfigNonBlocking(latch, task, "myval"); - } - - @Test - public void testGetDeferredSupplierNonBlocking() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - DeferredSupplier task = new DeferredSupplier() { - @Override public String get() { - try { - latch.await(); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); + }}); + return new TaskFactory>() { + @Override + public Task newTask() { + Task t = tb.build(); + tasksMadeByFactory.add(t); + return t; } - return "myval"; - } - }; - runGetConfigNonBlocking(latch, task, "myval"); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - protected void runGetConfigNonBlocking(CountDownLatch latch, Object blockingVal, String expectedVal) throws Exception { - TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) - .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal)) - .configure((ConfigKey)TestEntity.CONF_NAME, blockingVal)); + }; + } + + private DeferredSupplier deferredSupplier() { + return new DeferredSupplier() { + @Override public String get() { + try { + log.trace("acquiring"); + if (!latch.tryAcquire()) latch.acquire(); + latch.release(); + log.trace("acquired and released"); + } catch (InterruptedException e) { + log.trace("interrupted"); + throw Exceptions.propagate(e); + } + return "myval"; + } + }; + } - // Will initially return absent, because task is not done - assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent()); - assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent()); + protected void runGetConfigNonBlockingInKey() throws Exception { + Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this"); + + @SuppressWarnings("unchecked") + TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) + .configure((ConfigKey)(ConfigKey)TestEntity.CONF_NAME, blockingVal)); + + log.trace("get non-blocking"); + // Will initially return absent, because task is not done + assertTrue(entity.config().getNonBlocking(TestEntity.CONF_NAME).isAbsent()); + log.trace("got absent"); + + latch.release(); + + // Can now finish task, so will return expectedVal + log.trace("get blocking"); + assertEquals(entity.config().get(TestEntity.CONF_NAME), expectedVal); + log.trace("got blocking"); + assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(), expectedVal); + + latch.acquire(); + log.trace("finished"); + } - latch.countDown(); + protected void runGetConfigNonBlockingInMap() throws Exception { + Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this"); + TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class) + .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal))); + + // Will initially return absent, because task is not done + assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent()); + assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent()); + + if (blockingVal instanceof TaskFactory) { + assertAllOurConfigTasksCancelled(); + } else { + // TaskFactory tasks are cancelled, but others are not, + // things (ValueResolver?) are smart enough to know to leave it running + assertAllOurConfigTasksNotCancelled(); + } + + latch.release(); + + // Can now finish task, so will return expectedVal + assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mysub", expectedVal)); + assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")), expectedVal); + + assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal)); + assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal); + + assertAllTasksDone(); + } + + private void assertAllOurConfigTasksNotCancelled() { + for (Task t: tasksMadeByFactory) { + Assert.assertFalse( t.isCancelled(), "Task should not have been cancelled: "+t+" - "+t.getStatusDetail(false) ); + } + } - // Can now finish task, so will return expectedVal - assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mysub", expectedVal)); - assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")), expectedVal); + private void assertAllOurConfigTasksCancelled() { + // TODO added Feb 2017 - but might need an "eventually" here, if cancel is happening in a BG thread + // (but I think it is always foreground) + for (Task t: tasksMadeByFactory) { + Assert.assertTrue( t.isCancelled(), "Task should have been cancelled: "+t+" - "+t.getStatusDetail(false) ); + } + } - assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal)); - assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal); + private void assertAllTasksDone() { + for (Task t: tasksMadeByFactory) { + Assert.assertTrue( t.isDone(), "Task should have been done: "+t+" - "+t.getStatusDetail(false) ); + } + } + } + + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInKey(); + } + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInMap(); + } + + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskFactoryNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey(); + } + @Test(groups="Integration") // because takes 1s+ + public void testGetTaskFactoryNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInMap(); + } + + @Test(groups="Integration") // because takes 1s+ + public void testGetSupplierNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInKey(); + } + @Test(groups="Integration") // because takes 1s+ + public void testGetSuppierNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInMap(); + } + + @Test // fast + public void testGetImmediateSupplierNonBlockingKey() throws Exception { + new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInKey(); + } + @Test(groups="Integration") // because takes 1s+ + public void testGetImmediateSupplierNonBlockingMap() throws Exception { + new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInMap(); } @Test @@ -425,7 +575,7 @@ public String call() { assertEquals(getConfigFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), "abc"); } - @Test + @Test(groups="Integration") // takes 0.5s public void testGetConfigWithExecutedTaskWaitsForResult() throws Exception { LatchingCallable work = new LatchingCallable("abc"); Task task = executionManager.submit(work); @@ -447,7 +597,7 @@ public String call() { assertEquals(work.callCount.get(), 1); } - @Test + @Test(groups="Integration") // takes 0.5s public void testGetConfigWithUnexecutedTaskIsExecutedAndWaitsForResult() throws Exception { LatchingCallable work = new LatchingCallable("abc"); Task task = new BasicTask(work); diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java new file mode 100644 index 0000000000..fe83225816 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.core.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; +import org.testng.annotations.Test; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.util.concurrent.Callables; +import com.google.common.util.concurrent.Runnables; + +public class InterruptingImmediateSupplierTest { + + @Test(expectedExceptions=UnsupportedOperationException.class) + public void testOfInvalidType() throws Exception { + InterruptingImmediateSupplier.of("myval"); + } + + @Test + public void testRunnable() throws Exception { + assertImmediatelyPresent(Runnables.doNothing(), null); + assertImmediatelyAbsent(new SleepingRunnable()); + assertImmediatelyFails(new FailingRunnable(), MarkerException.class); + } + + @Test + public void testCallable() throws Exception { + assertImmediatelyPresent(Callables.returning("myval"), "myval"); + assertImmediatelyAbsent(new SleepingCallable()); + assertImmediatelyFails(new FailingCallable(), MarkerException.class); + } + + @Test + public void testSupplier() throws Exception { + assertImmediatelyPresent(Suppliers.ofInstance("myval"), "myval"); + assertImmediatelyAbsent(new SleepingSupplier()); + assertImmediatelyFails(new FailingSupplier(), MarkerException.class); + } + + private void assertImmediatelyPresent(Object orig, Object expected) { + Maybe result = getImmediately(orig); + assertEquals(result.get(), expected); + assertFalse(Thread.currentThread().isInterrupted()); + } + + private void assertImmediatelyAbsent(Object orig) { + Maybe result = getImmediately(orig); + assertTrue(result.isAbsent(), "result="+result); + assertFalse(Thread.currentThread().isInterrupted()); + } + + private void assertImmediatelyFails(Object orig, Class expected) { + try { + Maybe result = getImmediately(orig); + Asserts.shouldHaveFailedPreviously("result="+result); + } catch (Exception e) { + Asserts.expectedFailureOfType(e, expected); + } + assertFalse(Thread.currentThread().isInterrupted()); + } + + private Maybe getImmediately(Object val) { + InterruptingImmediateSupplier supplier = InterruptingImmediateSupplier.of(val); + return supplier.getImmediately(); + } + + public static class SleepingRunnable implements Runnable { + @Override public void run() { + Time.sleep(Duration.ONE_MINUTE); + } + } + + public static class SleepingCallable implements Callable { + @Override public Void call() { + Time.sleep(Duration.ONE_MINUTE); + return null; + } + } + + public static class SleepingSupplier implements Supplier { + @Override public Void get() { + Time.sleep(Duration.ONE_MINUTE); + return null; + } + } + + public static class FailingRunnable implements Runnable { + @Override public void run() { + throw new MarkerException(); + } + } + + public static class FailingCallable implements Callable { + @Override public Void call() { + throw new MarkerException(); + } + } + + public static class FailingSupplier implements Supplier { + @Override public Void get() { + throw new MarkerException(); + } + } + + public static class MarkerException extends RuntimeException { + private static final long serialVersionUID = -3395361406478634652L; + } +} \ No newline at end of file diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java index e47e4c9bbc..64cb024439 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java @@ -20,13 +20,17 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; import static org.testng.Assert.fail; import java.util.Arrays; +import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.mgmt.TaskAdaptable; +import org.apache.brooklyn.api.mgmt.TaskFactory; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.test.Asserts; @@ -37,6 +41,10 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Callables; + /** * see also {@link TasksTest} for more tests */ @@ -137,19 +145,17 @@ public void testDefaultBeforeDelayAndError() { assertMaybeIsAbsent(result); Assert.assertEquals(result.get(), "foo"); } - + public void testGetImmediately() { MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(); CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get(); - assertNull(callInfo.task); - assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately"); + assertImmediateFakeTaskFromMethod(callInfo, "testGetImmediately"); } public void testImmediateSupplierWithTimeoutUsesBlocking() { MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(); CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get(); - assertNotNull(callInfo.task); - assertNotContainsCallingMethod(callInfo.stackTrace, "testImmediateSupplierWithTimeoutUsesBlocking"); + assertRealTaskNotFromMethod(callInfo, "testImmediateSupplierWithTimeoutUsesBlocking"); } public void testGetImmediatelyInTask() throws Exception { @@ -164,16 +170,14 @@ private CallInfo myUniquelyNamedMethod() { } }); CallInfo callInfo = task.get(); - assertEquals(callInfo.task, task); - assertContainsCallingMethod(callInfo.stackTrace, "myUniquelyNamedMethod"); + assertImmediateFakeTaskFromMethod(callInfo, "myUniquelyNamedMethod"); } public void testGetImmediatelyFallsBackToDeferredCallInTask() throws Exception { final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(true); CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get(); - assertNotNull(callInfo.task); + assertRealTaskNotFromMethod(callInfo, "testGetImmediatelyFallsBackToDeferredCallInTask"); assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app); - assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediatelyFallsBackToDeferredCallInTask"); } public void testNonRecursiveBlockingFailsOnNonObjectType() throws Exception { @@ -224,6 +228,94 @@ public void testNonRecursiveImmediately() throws Exception { assertEquals(result.getClass(), FailingImmediateAndDeferredSupplier.class); } + public void testTaskFactoryGet() { + TaskFactory> taskFactory = new TaskFactory>() { + @Override public TaskAdaptable newTask() { + return new BasicTask<>(Callables.returning("myval")); + } + }; + String result = Tasks.resolving(taskFactory).as(String.class).context(app).get(); + assertEquals(result, "myval"); + } + + public void testTaskFactoryGetImmediately() { + TaskFactory> taskFactory = new TaskFactory>() { + @Override public TaskAdaptable newTask() { + return new BasicTask<>(Callables.returning("myval")); + } + }; + String result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).get(); + assertEquals(result, "myval"); + } + + public void testTaskFactoryGetImmediatelyDoesNotBlock() { + final AtomicBoolean executing = new AtomicBoolean(); + TaskFactory> taskFactory = new TaskFactory>() { + @Override public TaskAdaptable newTask() { + return new BasicTask<>(new Callable() { + public String call() { + executing.set(true); + try { + Time.sleep(Duration.ONE_MINUTE); + return "myval"; + } finally { + executing.set(false); + } + }}); + } + }; + Maybe result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe(); + Asserts.assertTrue(result.isAbsent(), "result="+result); + // the call below default times out after 30s while the task above is still running + Asserts.succeedsEventually(new Runnable() { + public void run() { + Asserts.assertFalse(executing.get()); + } + }); + } + + public void testTaskFactoryGetImmediatelyDoesNotBlockWithNestedTasks() { + final int NUM_CALLS = 3; + final AtomicInteger executingCount = new AtomicInteger(); + final List> outerTasks = Lists.newArrayList(); + + TaskFactory> taskFactory = new TaskFactory>() { + @Override public Task newTask() { + SequentialTask result = new SequentialTask<>(ImmutableList.of(new Callable() { + public String call() { + executingCount.incrementAndGet(); + try { + Time.sleep(Duration.ONE_MINUTE); + return "myval"; + } finally { + executingCount.decrementAndGet(); + } + }})); + outerTasks.add(result); + return result; + } + }; + for (int i = 0; i < NUM_CALLS; i++) { + Maybe result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe(); + Asserts.assertTrue(result.isAbsent(), "result="+result); + } + // the call below default times out after 30s while the task above is still running + Asserts.succeedsEventually(new Runnable() { + public void run() { + Asserts.assertEquals(outerTasks.size(), NUM_CALLS); + for (Task task : outerTasks) { + Asserts.assertTrue(task.isDone()); + Asserts.assertTrue(task.isCancelled()); + } + } + }); + Asserts.succeedsEventually(new Runnable() { + public void run() { + Asserts.assertEquals(executingCount.get(), 0); + } + }); + } + private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier, DeferredSupplier { private final boolean failImmediately; @@ -359,4 +451,18 @@ private void assertNotContainsCallingMethod(StackTraceElement[] stackTrace, Stri } } } + + private void assertImmediateFakeTaskFromMethod(CallInfo callInfo, String method) { + // previously task was null, but now there is a "fake task" + assertNotNull(callInfo.task); + Assert.assertFalse(callInfo.task.isSubmitted()); + assertContainsCallingMethod(callInfo.stackTrace, method); + } + + private void assertRealTaskNotFromMethod(CallInfo callInfo, String method) { + assertNotNull(callInfo.task); + Assert.assertTrue(callInfo.task.isSubmitted()); + assertNotContainsCallingMethod(callInfo.stackTrace, method); + } + } diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java index ee49b4abde..3b41cf6587 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Predicates.instanceOf; -import static com.google.common.base.Throwables.getCausalChain; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; @@ -55,6 +54,8 @@ public class Exceptions { private static final List> BORING_IF_NO_MESSAGE_THROWABLE_SUPERTYPES = ImmutableList.>of( PropagatedRuntimeException.class); + public static final int MAX_COLLAPSE_RECURSIVE_DEPTH = 100; + /** NB: might be useful for stack trace, e.g. {@link ExecutionException} */ private static boolean isBoringForMessage(Throwable t) { for (Class type: ALWAYS_BORING_MESSAGE_THROWABLE_SUPERTYPES) @@ -263,8 +264,41 @@ public static Throwable collapseIncludingAllCausalMessages(Throwable source) { public static Throwable collapse(Throwable source, boolean collapseCausalChain) { return collapse(source, collapseCausalChain, false, ImmutableSet.of(), new Object[0]); } + + /** As {@link Throwables#getCausalChain(Throwable)} but safe in the face of perverse classes which return themselves as their cause or otherwise have a recursive causal chain. */ + public static List getCausalChain(Throwable t) { + Set result = MutableSet.of(); + while (t!=null) { + if (!result.add(t)) break; + t = t.getCause(); + } + return ImmutableList.copyOf(result); + } + + private static boolean isCausalChainDepthExceeding(Throwable t, int size) { + if (size<0) return true; + if (t==null) return false; + return isCausalChainDepthExceeding(t.getCause(), size-1); + } private static Throwable collapse(Throwable source, boolean collapseCausalChain, boolean includeAllCausalMessages, Set visited, Object contexts[]) { + if (visited.isEmpty()) { + if (isCausalChainDepthExceeding(source, MAX_COLLAPSE_RECURSIVE_DEPTH)) { + // do fast check above, then do deeper check which survives recursive causes + List chain = getCausalChain(source); + if (chain.size() > MAX_COLLAPSE_RECURSIVE_DEPTH) { + // if it's an OOME or other huge stack, shrink it so we don't spin huge cycles processing the trace and printing it + // (sometimes generating subsequent OOME's in logback that mask the first!) + // coarse heuristic for how to reduce it, but that's better than killing cpu, causing further errors, and suppressing the root cause altogether! + String msg = chain.get(0).getMessage(); + if (msg.length() > 512) msg = msg.substring(0, 500)+"..."; + return new PropagatedRuntimeException("Huge stack trace (size "+chain.size()+", removing all but last few), " + + "starting: "+chain.get(0).getClass().getName()+": "+msg+"; ultimately caused by: ", + chain.get(chain.size() - 10)); + } + } + } + visited = MutableSet.copyOf(visited); String message = ""; Throwable collapsed = source; diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java b/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java index 65b5d91248..272d3f6df2 100644 --- a/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java +++ b/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java @@ -34,6 +34,7 @@ import org.testng.annotations.Test; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; public class ExceptionsTest { @@ -306,6 +307,25 @@ public void testNestedExecAndProp() { Assert.assertEquals(Exceptions.collapseText(t), "IOException: 1"); } + @Test + public void testGetCausalChain() throws Exception { + Exception e1 = new Exception("e1"); + Exception e2 = new Exception("e2", e1); + assertEquals(Exceptions.getCausalChain(e2), ImmutableList.of(e2, e1)); + } + + @Test + public void testGetCausalChainRecursive() throws Exception { + Exception e1 = new Exception("e1") { + private static final long serialVersionUID = 1L; + public synchronized Throwable getCause() { + return this; + } + }; + Exception e2 = new Exception("e2", e1); + assertEquals(Exceptions.getCausalChain(e2), ImmutableList.of(e2, e1)); + } + @Test public void testComplexJcloudsExample() { Throwable t;