diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
index 4540240ec3..344907a45a 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/ExecutionContext.java
@@ -24,6 +24,9 @@
import java.util.concurrent.Executor;
import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.util.guava.Maybe;
+
+import com.google.common.annotations.Beta;
/**
* This is a Brooklyn extension to the Java {@link Executor}.
@@ -64,4 +67,18 @@ public interface ExecutionContext extends Executor {
boolean isShutdown();
+ /**
+ * Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available.
+ * It may throw an error if it cannot be determined whether a value is available immediately or not.
+ *
+ * Implementations will typically attempt to execute in the current thread, with appropriate
+ * tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but
+ * if needed they may block.
+ *
+ * Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics.
+ */
+ // TODO reference ImmediateSupplier when that class is moved to utils project
+ @Beta
+ Maybe getImmediately(Object callableOrSupplierOrTask);
+
}
\ No newline at end of file
diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
index 9de33405b7..867c108dd3 100644
--- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
+++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/methods/DslComponent.java
@@ -48,6 +48,7 @@
import org.apache.brooklyn.util.core.task.DeferredSupplier;
import org.apache.brooklyn.util.core.task.ImmediateSupplier;
import org.apache.brooklyn.util.core.task.TaskBuilder;
+import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
@@ -206,6 +207,15 @@ public Maybe getImmediately() {
}
}
+ @Override
+ public Entity get() {
+ try {
+ return call();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
@Override
public Entity call() throws Exception {
return callImpl(false).get();
@@ -219,7 +229,7 @@ protected Maybe getEntity(boolean immediate) {
return Maybe.of(scopeComponent.get());
}
} else {
- return Maybe.of(entity());
+ return Maybe.ofDisallowingNull(entity()).or(Maybe.absent("Context entity not available when trying to evaluate Brooklyn DSL"));
}
}
@@ -311,10 +321,11 @@ protected Maybe callImpl(boolean immediate) throws Exception {
return Maybe.of(result.get());
}
- // TODO may want to block and repeat on new entities joining?
- throw new NoSuchElementException("No entity matching id " + desiredComponentId+
+ // could be nice if DSL has an extra .block() method to allow it to wait for a matching entity.
+ // previously we threw if nothing existed; now we return an absent with a detailed error
+ return Maybe.absent(new NoSuchElementException("No entity matching id " + desiredComponentId+
(scope==Scope.GLOBAL ? "" : ", in scope "+scope+" wrt "+entity+
- (scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : "")));
+ (scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : ""))));
}
private ExecutionContext getExecutionContext() {
@@ -538,14 +549,11 @@ protected String resolveKeyName(boolean immediately) {
@Override
public final Maybe getImmediately() {
- Maybe targetEntityMaybe = component.getImmediately();
- if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
- EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get();
-
- String keyNameS = resolveKeyName(true);
- ConfigKey> key = targetEntity.getEntityType().getConfigKey(keyNameS);
- Maybe> result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS));
- return Maybe.cast(result);
+ Maybe maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallableReturningImmediateMaybeOrNonImmediateValue(true));
+ // the answer will be wrapped twice due to the callable semantics;
+ // the inner present/absent is important; it will only get an outer absent if interrupted
+ if (maybeWrappedMaybe.isAbsent()) return maybeWrappedMaybe;
+ return Maybe.cast( (Maybe>) maybeWrappedMaybe.get() );
}
@Override
@@ -554,15 +562,55 @@ public Task newTask() {
.displayName("retrieving config for "+keyName)
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.dynamic(false)
- .body(new Callable() {
- @Override
- public Object call() throws Exception {
- Entity targetEntity = component.get();
- String keyNameS = resolveKeyName(true);
- ConfigKey> key = targetEntity.getEntityType().getConfigKey(keyNameS);
- return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS));
- }})
- .build();
+ .body(newCallableReturningImmediateMaybeOrNonImmediateValue(false)).build();
+ }
+
+ private Callable newCallableReturningImmediateMaybeOrNonImmediateValue(final boolean immediate) {
+ return new Callable() {
+ @Override
+ public Object call() throws Exception {
+ Entity targetEntity;
+ if (immediate) {
+ Maybe targetEntityMaybe = component.getImmediately();
+ if (targetEntityMaybe.isAbsent()) return Maybe.cast(targetEntityMaybe);
+ targetEntity = (EntityInternal) targetEntityMaybe.get();
+ } else {
+ targetEntity = component.get();
+ }
+
+ // this is always run in a new dedicated task (possibly a fake task if immediate), so no need to clear
+ checkAndTagForRecursiveReference(targetEntity);
+
+ String keyNameS = resolveKeyName(true);
+ ConfigKey> key = targetEntity.getEntityType().getConfigKey(keyNameS);
+ if (key==null) key = ConfigKeys.newConfigKey(Object.class, keyNameS);
+ if (immediate) {
+ return ((EntityInternal)targetEntity).config().getNonBlocking(key);
+ } else {
+ return targetEntity.getConfig(key);
+ }
+ }
+ };
+ }
+
+ private void checkAndTagForRecursiveReference(Entity targetEntity) {
+ String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')";
+ Task> ancestor = Tasks.current();
+ if (ancestor!=null) {
+ // don't check on ourself; only look higher in hierarchy;
+ // this assumes impls always spawn new tasks (which they do, just maybe not always in new threads)
+ // but it means it does not rely on tag removal to prevent weird errors,
+ // and more importantly it makes the strategy idempotent
+ ancestor = ancestor.getSubmittedByTask();
+ }
+ while (ancestor!=null) {
+ if (TaskTags.hasTag(ancestor, tag)) {
+ throw new IllegalStateException("Recursive config reference "+tag);
+ }
+ ancestor = ancestor.getSubmittedByTask();
+ }
+
+ Tasks.addTagDynamically(tag);
}
@Override
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
index 07fc36a85e..1686f55d9f 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
@@ -28,8 +28,14 @@
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.entity.TestEntity;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -44,7 +50,6 @@
public class ConfigYamlTest extends AbstractYamlTest {
- @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ConfigYamlTest.class);
private ExecutorService executor;
@@ -91,6 +96,61 @@ public void testConfigInConfigBlock() throws Exception {
assertNull(entity.getMyField()); // field with @SetFromFlag
assertNull(entity.getMyField2()); // field with @SetFromFlag("myField2Alias"), set using alias
}
+
+
+ @Test
+ public void testRecursiveConfigFailsGracefully() throws Exception {
+ doTestRecursiveConfigFailsGracefully(false);
+ }
+
+ @Test
+ public void testRecursiveConfigImmediateFailsGracefully() throws Exception {
+ doTestRecursiveConfigFailsGracefully(true);
+ }
+
+ protected void doTestRecursiveConfigFailsGracefully(boolean immediate) throws Exception {
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " brooklyn.config:",
+ " infinite_loop: $brooklyn:config(\"infinite_loop\")");
+
+ final Entity app = createStartWaitAndLogApplication(yaml);
+ TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Time.sleep(Duration.FIVE_SECONDS);
+ // error, loop wasn't interrupted or detected
+ LOG.warn("Timeout elapsed, destroying items; usage: "+
+ ((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString());
+ Entities.destroy(app);
+ } catch (RuntimeInterruptedException e) {
+ // expected on normal execution; clear the interrupted flag to prevent ugly further warnings being logged
+ Thread.interrupted();
+ }
+ }
+ });
+ t.start();
+ try {
+ String c;
+ if (immediate) {
+ // this should throw rather than return "absent", because the error is definitive (absent means couldn't resolve in time)
+ c = entity.config().getNonBlocking(ConfigKeys.newStringConfigKey("infinite_loop")).or("FAILED");
+ } else {
+ c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop"));
+ }
+ Asserts.shouldHaveFailedPreviously("Expected recursive error, instead got: "+c);
+ } catch (Exception e) {
+ Asserts.expectedFailureContainsIgnoreCase(e, "infinite_loop", "recursive");
+ } finally {
+ if (!Entities.isManaged(app)) {
+ t.interrupt();
+ }
+ }
+ }
@Test
public void testConfigAtTopLevel() throws Exception {
diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
index d387920f37..63aba8e84d 100644
--- a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
+++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/DslTest.java
@@ -48,6 +48,7 @@
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -296,14 +297,13 @@ public void testUrlEncode() throws Exception {
@Test
public void testEntityNotFound() throws Exception {
BrooklynDslDeferredSupplier> dsl = BrooklynDslCommon.entity("myIdDoesNotExist");
+ Maybe> actualValue = execDslImmediately(dsl, Entity.class, app, true);
+ Assert.assertTrue(actualValue.isAbsent());
try {
- Maybe> actualValue = execDslImmediately(dsl, Entity.class, app, true);
+ actualValue.get();
Asserts.shouldHaveFailedPreviously("actual="+actualValue);
} catch (Exception e) {
- NoSuchElementException nsee = Exceptions.getFirstThrowableOfType(e, NoSuchElementException.class);
- if (nsee == null) {
- throw e;
- }
+ Asserts.expectedFailureOfType(e, NoSuchElementException.class);
}
}
@@ -365,7 +365,7 @@ public DslTestWorker satisfiedAsynchronously(boolean val) {
return this;
}
- @SuppressWarnings("unused") // included for completeness?
+ @SuppressWarnings("unused") // kept in case useful for additional tests, for completeness
public DslTestWorker wrapInTaskForImmediately(boolean val) {
wrapInTaskForImmediately = val;
return this;
diff --git a/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java b/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java
index 2d92617516..b736beb27e 100644
--- a/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/config/internal/AbstractConfigMapImpl.java
@@ -30,6 +30,7 @@
import javax.annotation.Nullable;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.config.ConfigInheritance;
import org.apache.brooklyn.config.ConfigInheritances;
@@ -231,7 +232,7 @@ public Maybe getConfigRaw(ConfigKey> key, boolean includeInherited) {
}
protected Object coerceConfigVal(ConfigKey> key, Object v) {
- if ((v instanceof Future) || (v instanceof DeferredSupplier)) {
+ if ((v instanceof Future) || (v instanceof DeferredSupplier) || (v instanceof TaskFactory)) {
// no coercion for these (coerce on exit)
return v;
} else if (key instanceof StructuredConfigKey) {
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 2280be7e7b..7d721d04a3 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -80,6 +80,8 @@ public class BrooklynTaskTags extends TaskTags {
* and that it need not appear in some task lists;
* often used for framework lifecycle events and sensor polling */
public static final String TRANSIENT_TASK_TAG = "TRANSIENT";
+ /** marks that a task is meant to return immediately, without blocking (or if absolutely necessary blocking for a short while) */
+ public static final String IMMEDIATE_TASK_TAG = "IMMEDIATE";
// ------------- entity tags -------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
index ce10c86957..796ab137da 100644
--- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractConfigurationSupportInternal.java
@@ -146,7 +146,6 @@ protected Maybe getNonBlockingResolvingSimple(ConfigKey key) {
.immediately(true)
.deep(true)
.context(getContext())
- .swallowExceptions()
.get();
return (resolved != marker)
? TypeCoercions.tryCoerce(resolved, key.getTypeToken())
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index 38f1b5a4ce..f35a68af4c 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -41,10 +41,13 @@
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException;
+import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
+import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
/**
@@ -96,7 +99,55 @@ public ExecutionManager getExecutionManager() {
/** returns tasks started by this context (or tasks which have all the tags on this object) */
@Override
public Set> getTasks() { return executionManager.getTasksWithAllTags(tags); }
-
+
+ /** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context;
+ * currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances;
+ * with tasks if it is submitted or in progress,
+ * it fails if not completed; with unsubmitted, unqueued tasks, it gets the {@link Callable} job and
+ * uses that; with such a job, or any other callable/supplier/runnable, it runs that
+ * in an {@link InterruptingImmediateSupplier}, with as much metadata as possible (eg task name if
+ * given a task) set temporarily in the current thread context */
+ @SuppressWarnings("unchecked")
+ @Override
+ public Maybe getImmediately(Object callableOrSupplier) {
+ BasicTask> fakeTaskForContext;
+ if (callableOrSupplier instanceof BasicTask) {
+ fakeTaskForContext = (BasicTask>)callableOrSupplier;
+ if (fakeTaskForContext.isQueuedOrSubmitted()) {
+ if (fakeTaskForContext.isDone()) {
+ return Maybe.of((T)fakeTaskForContext.getUnchecked());
+ } else {
+ throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
+ }
+ }
+ callableOrSupplier = fakeTaskForContext.getJob();
+ } else {
+ fakeTaskForContext = new BasicTask(MutableMap.of("displayName", "immediate evaluation"));
+ }
+ fakeTaskForContext.tags.addAll(tags);
+ fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
+ fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
+
+ Task> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get();
+ BasicExecutionContext oldExecutionContext = getCurrentExecutionContext();
+ registerPerThreadExecutionContext();
+
+ if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask);
+ fakeTaskForContext.cancel();
+ try {
+ BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);
+
+ if (!(callableOrSupplier instanceof ImmediateSupplier)) {
+ callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier);
+ }
+ return ((ImmediateSupplier)callableOrSupplier).getImmediately();
+
+ } finally {
+ BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
+ perThreadExecutionContext.set(oldExecutionContext);
+ }
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected Task submitInternal(Map,?> propertiesQ, final Object task) {
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index 51f1b67a6a..8b59498907 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -591,7 +591,7 @@ public boolean cancel(TaskCancellationMode mode) {
if (!task.isCancelled()) result |= ((TaskInternal)task).cancel(mode);
result |= delegate().cancel(mode.isAllowedToInterruptTask());
- if (mode.isAllowedToInterruptAllSubmittedTasks() || mode.isAllowedToInterruptDependentSubmittedTasks()) {
+ if (mode.isAllowedToInterruptDependentSubmittedTasks()) {
int subtasksFound=0;
int subtasksReallyCancelled=0;
@@ -753,7 +753,10 @@ protected void beforeStartAtomicTask(Map,?> flags, Task> task) {
/** invoked in a task's thread when a task is starting to run (may be some time after submitted),
* but before doing any of the task's work, so that we can update bookkeeping and notify callbacks */
protected void internalBeforeStart(Map,?> flags, Task> task) {
- activeTaskCount.incrementAndGet();
+ int count = activeTaskCount.incrementAndGet();
+ if (count % 1000==0) {
+ log.warn("High number of active tasks: task #"+count+" is "+task);
+ }
//set thread _before_ start time, so we won't get a null thread when there is a start-time
if (log.isTraceEnabled()) log.trace(""+this+" beforeStart, task: "+task + " running on thread " + Thread.currentThread().getName());
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java
index 72dfb442f4..db1722981e 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/CompoundTask.java
@@ -128,4 +128,19 @@ public List> getChildren() {
return (List) getChildrenTyped();
}
+ @Override
+ protected boolean doCancel(org.apache.brooklyn.util.core.task.TaskInternal.TaskCancellationMode mode) {
+ boolean result = false;
+ if (mode.isAllowedToInterruptDependentSubmittedTasks()) {
+ for (Task> t: getChildren()) {
+ if (!t.isDone()) {
+ result = ((TaskInternal>)t).cancel(mode) || result;
+ }
+ }
+ }
+ result = super.doCancel(mode) || result;
+ return result;
+ // returns true if anything is successfully cancelled
+ }
+
}
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
index 1b421b0b25..2869ff98de 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/DynamicSequentialTask.java
@@ -160,9 +160,11 @@ public void queue(Task> t) {
@Override
protected boolean doCancel(TaskCancellationMode mode) {
boolean result = false;
- if (mode.isAllowedToInterruptDependentSubmittedTasks() || mode.isAllowedToInterruptAllSubmittedTasks()) {
- for (Task> t: secondaryJobsAll)
+ if (mode.isAllowedToInterruptDependentSubmittedTasks()) {
+ for (Task> t: secondaryJobsAll) {
+ // secondary jobs are dependent
result = ((TaskInternal>)t).cancel(mode) || result;
+ }
}
return super.doCancel(mode) || result;
// returns true if anything is successfully cancelled
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
index ac0aae4e4e..5ec8d68d1b 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ImmediateSupplier.java
@@ -20,14 +20,17 @@
import org.apache.brooklyn.util.guava.Maybe;
+import com.google.common.base.Supplier;
+
/**
- * A class that supplies objects of a single type, without blocking for any significant length
- * of time.
+ * A {@link Supplier} that has an extra method capable of supplying a value immediately or an absent if definitely not available,
+ * or throwing an {@link ImmediateUnsupportedException} if it cannot determine whether a value is immediately available.
*/
-public interface ImmediateSupplier {
+public interface ImmediateSupplier extends Supplier {
/**
- * Indicates that we are unable to get the value immediately, because that is not supported
+ * Indicates that a supplier does not support immediate evaluation,
+ * i.e. it may need to block to evaluate even if there is a value available
* (e.g. because the supplier is composed of sub-tasks that do not support {@link ImmediateSupplier}.
*/
public static class ImmediateUnsupportedException extends UnsupportedOperationException {
@@ -44,7 +47,7 @@ public ImmediateUnsupportedException(String message, Throwable cause) {
/**
* Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available.
*
- * @throws ImmediateUnsupportedException if cannot determinte the value immediately
+ * @throws ImmediateUnsupportedException if cannot determine whether a value is immediately available
*/
Maybe getImmediately();
}
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
new file mode 100644
index 0000000000..afbc2853ee
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplier.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Semaphore;
+
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.ReferenceWithError;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.guava.Maybe;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Supplier;
+
+/**
+ * Wraps a {@link Supplier} as an {@link ImmediateSupplier} by interrupting the thread before calling {@link Supplier#get()}.
+ * If the call succeeds, the result is returned.
+ * If the call throws any trace including an {@link InterruptedException} or {@link RuntimeInterruptedException}
+ * (ie the call failed due to the interruption, typically because it tried to wait)
+ * then this class concludes that there is no value available immediately and returns {@link Maybe#absent()}.
+ * If the call throws any other error, that is returned.
+ * The interruption is cleared afterwards (unless the thread was interrupted when the method was entered).
+ *
+ * Note that some "immediate" methods, such as {@link Semaphore#acquire()} when a semaphore is available,
+ * will throw if the thread is interrupted. Typically there are workarounds, for instance:
+ * if (semaphore.tryAcquire()) semaphore.acquire();.
+ */
+@Beta
+public class InterruptingImmediateSupplier implements ImmediateSupplier, DeferredSupplier {
+
+ private final Supplier nestedSupplier;
+
+ public InterruptingImmediateSupplier(Supplier nestedSupplier) {
+ this.nestedSupplier = nestedSupplier;
+ }
+
+ @Override
+ public Maybe getImmediately() {
+ boolean interrupted = Thread.currentThread().isInterrupted();
+ try {
+ if (!interrupted) Thread.currentThread().interrupt();
+ return Maybe.ofAllowingNull(get());
+ } catch (Throwable t) {
+ if (Exceptions.getFirstThrowableOfType(t, InterruptedException.class)!=null ||
+ Exceptions.getFirstThrowableOfType(t, RuntimeInterruptedException.class)!=null ||
+ Exceptions.getFirstThrowableOfType(t, CancellationException.class)!=null) {
+ return Maybe.absent(new UnsupportedOperationException("Immediate value not available", t));
+ }
+ throw Exceptions.propagate(t);
+ } finally {
+ if (!interrupted) Thread.interrupted();
+ }
+ }
+
+ @Override
+ public T get() {
+ return nestedSupplier.get();
+ }
+
+ public static InterruptingImmediateSupplier of(final Object o) {
+ return InterruptingImmediateSupplier.ofSafe(o).get();
+ }
+
+ @SuppressWarnings("unchecked")
+ public static ReferenceWithError> ofSafe(final Object o) {
+ if (o instanceof Supplier) {
+ return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier((Supplier)o));
+ } else if (o instanceof Callable) {
+ return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier(new Supplier() {
+ @Override
+ public T get() {
+ try {
+ return ((Callable)o).call();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ }));
+ } else if (o instanceof Runnable) {
+ return ReferenceWithError.newInstanceWithoutError(new InterruptingImmediateSupplier(new Supplier() {
+ @Override
+ public T get() {
+ ((Runnable)o).run();
+ return null;
+ }
+ }));
+ } else {
+ return ReferenceWithError.newInstanceThrowingError(null, new InterruptingImmediateSupplierNotSupportedForObject(o));
+ }
+ }
+
+ public static class InterruptingImmediateSupplierNotSupportedForObject extends UnsupportedOperationException {
+ private static final long serialVersionUID = 307517409005386500L;
+
+ public InterruptingImmediateSupplierNotSupportedForObject(Object o) {
+ super("Type "+o.getClass()+" not supported as InterruptingImmediateSupplier (instance "+o+")");
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
index 99c2773d64..f565aa06b3 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskInternal.java
@@ -143,6 +143,9 @@ private TaskCancellationMode(boolean mayInterruptIfRunning, boolean interruptSub
this.allowedToInterruptTask = mayInterruptIfRunning;
this.allowedToInterruptDependentSubmittedTasks = interruptSubmittedTransients;
this.allowedToInterruptAllSubmittedTasks = interruptAllSubmitted;
+
+ // if dependent isn't set, then all shouldn't be set
+ assert !(this.allowedToInterruptAllSubmittedTasks && !this.allowedToInterruptDependentSubmittedTasks);
}
public boolean isAllowedToInterruptTask() { return allowedToInterruptTask; }
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java
index 6b64a6b0f6..4319796810 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/TaskTags.java
@@ -62,6 +62,7 @@ public static boolean isInessential(Task> task) {
}
public static boolean hasTag(Task> task, Object tag) {
+ if (task==null) return false;
return task.getTags().contains(tag);
}
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index f81594eba8..3c6d96b14e 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -29,6 +29,7 @@
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.core.entity.EntityInternal;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
@@ -322,6 +323,10 @@ public Maybe getMaybe() {
return result;
}
+ protected boolean isEvaluatingImmediately() {
+ return immediately || BrooklynTaskTags.hasTag(Tasks.current(), BrooklynTaskTags.IMMEDIATE_TASK_TAG);
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
protected Maybe getMaybeInternal() {
if (started.getAndSet(true))
@@ -352,33 +357,78 @@ protected Maybe getMaybeInternal() {
//if the expected type is a closure or map and that's what we have, we're done (or if it's null);
//but not allowed to return a future or DeferredSupplier as the resolved value
- if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v)))
+ if (v==null || (!forceDeep && type.isInstance(v) && !Future.class.isInstance(v) && !DeferredSupplier.class.isInstance(v) && !TaskFactory.class.isInstance(v)))
return Maybe.of((T) v);
try {
- if (immediately && v instanceof ImmediateSupplier) {
- final ImmediateSupplier> supplier = (ImmediateSupplier>) v;
+ boolean allowImmediateExecution = false;
+ boolean bailOutAfterImmediateExecution = false;
+
+ if (v instanceof ImmediateSupplier) {
+ allowImmediateExecution = true;
+
+ } else {
+ if ((v instanceof TaskFactory>) && !(v instanceof DeferredSupplier)) {
+ v = ((TaskFactory>)v).newTask();
+ allowImmediateExecution = true;
+ bailOutAfterImmediateExecution = true;
+ BrooklynTaskTags.setTransient(((TaskAdaptable>)v).asTask());
+ if (isEvaluatingImmediately()) {
+ // not needed if executing immediately
+ BrooklynTaskTags.addTagDynamically( ((TaskAdaptable>)v).asTask(), BrooklynTaskTags.IMMEDIATE_TASK_TAG );
+ }
+ }
+
+ //if it's a task or a future, we wait for the task to complete
+ if (v instanceof TaskAdaptable>) {
+ v = ((TaskAdaptable>) v).asTask();
+ }
+ }
+
+ if (allowImmediateExecution && isEvaluatingImmediately()) {
+ // TODO could allow for everything, when evaluating immediately -- but if the target isn't safe to run again
+ // then we have to fail if immediate didn't work; to avoid breaking semantics we only do that for a few cases;
+ // might be nice to get to the point where we can break those semantics however,
+ // ie weakening what getImmediate supports and making it be non-blocking, so that bailOut=true is the default.
+ // if: v instanceof TaskFactory -- it is safe, it's a new API (but it is currently the only one supported);
+ // more than safe, we have to do it -- or add code here to cancel tasks -- because it spawns new tasks
+ // (other objects passed through here don't get cancelled, because other things might try again later;
+ // ie a task or future passed in here might naturally be long-running so cancelling is wrong,
+ // but with a task factory generated task it would leak if we submitted and didn't cancel!)
+ // if: v instanceof ImmediateSupplier -- it probably is safe to change to bailOut = true ?
+ // if: v instanceof Task or other things -- it currently isn't safe, there are places where
+ // we expect to getImmediate on things which don't support it nicely,
+ // and we rely on the blocking-short-wait behaviour, e.g. QuorumChecks in ConfigYamlTest
try {
- Maybe> result = supplier.getImmediately();
-
- // Recurse: need to ensure returned value is cast, etc
- return (result.isPresent())
- ? recursive
- ? new ValueResolver(result.get(), type, this).getMaybe()
- : result
- : Maybe.absent();
+ Maybe result = execImmediate(exec, v);
+ if (result!=null) return result;
+ if (bailOutAfterImmediateExecution) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v);
+ }
} catch (ImmediateSupplier.ImmediateUnsupportedException e) {
- log.debug("Unable to resolve-immediately for "+description+" ("+v+"); falling back to executing with timeout", e);
+ if (bailOutAfterImmediateExecution) {
+ throw new ImmediateSupplier.ImmediateUnsupportedException("Cannot get immediately: "+v, e);
+ }
+ log.debug("Unable to resolve-immediately for "+description+" ("+v+", unsupported, type "+v.getClass()+"); falling back to executing with timeout: "+e);
+ } catch (InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject e) {
+ // ignore, continue below
+ log.debug("Unable to resolve-immediately for "+description+" ("+v+", not supported for type "+v.getClass()+"); falling back to executing with timeout: "+e);
}
}
- //if it's a task or a future, we wait for the task to complete
- if (v instanceof TaskAdaptable>) {
+ if (v instanceof Task) {
//if it's a task, we make sure it is submitted
- if (!((TaskAdaptable>) v).asTask().isSubmitted() ) {
- if (exec==null)
+ Task> task = (Task>) v;
+ if (!task.isSubmitted()) {
+ if (exec==null) {
return Maybe.absent("Value for unsubmitted task '"+getDescription()+"' requested but no execution context available");
- exec.submit(((TaskAdaptable>) v).asTask());
+ }
+ if (!task.getTags().contains(BrooklynTaskTags.TRANSIENT_TASK_TAG)) {
+ // mark this non-transient, because this value is usually something set e.g. in config
+ // (should discourage this in favour of task factories which can be transiently interrupted?)
+ BrooklynTaskTags.addTagDynamically(task, BrooklynTaskTags.NON_TRANSIENT_TASK_TAG);
+ }
+ exec.submit(task);
}
}
@@ -493,7 +543,9 @@ public Object call() throws Exception {
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
- IllegalArgumentException problem = new IllegalArgumentException("Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec+": "+e, e);
+ String msg = "Error resolving "+(description!=null ? description+", " : "")+v+", in "+exec;
+ String eTxt = Exceptions.collapseText(e);
+ IllegalArgumentException problem = eTxt.startsWith(msg) ? new IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e);
if (swallowExceptions) {
if (log.isDebugEnabled())
log.debug("Resolution of "+this+" failed, swallowing and returning: "+e);
@@ -512,6 +564,21 @@ public Object call() throws Exception {
}
}
+ /** tries to get immediately, then resolve recursively (including for casting) if {@link #recursive} is set
+ *
+ * @throws InterruptingImmediateSupplier.InterruptingImmediateSupplierNotSupportedForObject
+ * ImmediateSupplier.ImmediateUnsupportedException
+ * if underlying call to {@link ExecutionContext#getImmediately(Object)} does so */
+ protected Maybe execImmediate(ExecutionContext exec, Object immediateSupplierOrImmediateTask) {
+ Maybe result = exec.getImmediately(immediateSupplierOrImmediateTask);
+
+ return (result.isPresent())
+ ? recursive
+ ? new ValueResolver(result.get(), type, this).getMaybe()
+ : result
+ : result;
+ }
+
protected String getDescription() {
return description!=null ? description : ""+value;
}
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java
index 38f5f90301..2f40fe9c30 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntityConfigTest.java
@@ -22,12 +22,14 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,6 +38,7 @@
import org.apache.brooklyn.api.entity.ImplementedBy;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.config.ConfigPredicates;
@@ -44,16 +47,22 @@
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.entity.stock.BasicEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.core.flags.SetFromFlag;
import org.apache.brooklyn.util.core.task.BasicTask;
import org.apache.brooklyn.util.core.task.DeferredSupplier;
+import org.apache.brooklyn.util.core.task.InterruptingImmediateSupplier;
+import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -63,6 +72,8 @@
public class EntityConfigTest extends BrooklynAppUnitTestSupport {
+ private static final Logger log = LoggerFactory.getLogger(EntityConfigTest.class);
+
private static final int TIMEOUT_MS = 10*1000;
private ExecutorService executor;
@@ -244,57 +255,196 @@ public void testGetConfigMapWithSubValueAsStringNotCoerced() throws Exception {
// of the previous "test.confMapThing.obj".
//
// Presumably an earlier call to task.get() timed out, causing it to cancel the task?
+ // Alex: yes, a task.cancel is performed for maps in
+ // AbstractEntity$BasicConfigurationSupport(AbstractConfigurationSupportInternal).getNonBlockingResolvingStructuredKey(ConfigKey)
+
+ //
// I (Aled) question whether we want to support passing a task (rather than a
// DeferredSupplier or TaskFactory, for example). Our EntitySpec.configure is overloaded
// to take a Task, but that feels wrong!?
- @Test(groups="Broken")
- public void testGetTaskNonBlocking() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- Task task = Tasks.builder().body(
+ //
+ // If starting clean I (Alex) would agree, we should use TaskFactory. However the
+ // DependentConfiguration methods -- including the ubiquitous AttributeWhenReady --
+ // return Task instances so they should survive a getNonBlocking or get with a short timeout
+ // access, and if a value is subsequently available it should be returned
+ // (which this test asserts, but is currently failing). If TaskFactory is used the
+ // intended semantics are clear -- you create a new task on each access, and can interrupt it
+ // and discard it if needed. For a Task it's less clear: probably the semantics are that the
+ // first returned value is what the value is forevermore. Probably it should not be interrupted
+ // on a non-blocking / short-wait access, or possibly it should simply be re-run if a previous
+ // execution was interrupted (but take care if we have a simultaneous non-blocking and blocking
+ // access, if the first one interrupts the second one should still get a value).
+ // I tend to think ideally we should switch to using TaskFactory in DependentConfiguration.
+ class ConfigNonBlockingFixture {
+ final Semaphore latch = new Semaphore(0);
+ final String expectedVal = "myval";
+ Object blockingVal;
+ List> tasksMadeByFactory = MutableList.of();
+
+ protected ConfigNonBlockingFixture usingTask() {
+ blockingVal = taskFactory().newTask();
+ return this;
+ }
+
+ protected ConfigNonBlockingFixture usingTaskFactory() {
+ blockingVal = taskFactory();
+ return this;
+ }
+
+ protected ConfigNonBlockingFixture usingDeferredSupplier() {
+ blockingVal = deferredSupplier();
+ return this;
+ }
+
+ protected ConfigNonBlockingFixture usingImmediateSupplier() {
+ blockingVal = new InterruptingImmediateSupplier(deferredSupplier());
+ return this;
+ }
+
+ private TaskFactory> taskFactory() {
+ final TaskBuilder tb = Tasks.builder().body(
new Callable() {
@Override
public String call() throws Exception {
- latch.await();
+ if (!latch.tryAcquire()) latch.acquire();
+ latch.release();
return "myval";
- }})
- .build();
- runGetConfigNonBlocking(latch, task, "myval");
- }
-
- @Test
- public void testGetDeferredSupplierNonBlocking() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- DeferredSupplier task = new DeferredSupplier() {
- @Override public String get() {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
+ }});
+ return new TaskFactory>() {
+ @Override
+ public Task newTask() {
+ Task t = tb.build();
+ tasksMadeByFactory.add(t);
+ return t;
}
- return "myval";
- }
- };
- runGetConfigNonBlocking(latch, task, "myval");
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- protected void runGetConfigNonBlocking(CountDownLatch latch, Object blockingVal, String expectedVal) throws Exception {
- TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class)
- .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal))
- .configure((ConfigKey)TestEntity.CONF_NAME, blockingVal));
+ };
+ }
+
+ private DeferredSupplier deferredSupplier() {
+ return new DeferredSupplier() {
+ @Override public String get() {
+ try {
+ log.trace("acquiring");
+ if (!latch.tryAcquire()) latch.acquire();
+ latch.release();
+ log.trace("acquired and released");
+ } catch (InterruptedException e) {
+ log.trace("interrupted");
+ throw Exceptions.propagate(e);
+ }
+ return "myval";
+ }
+ };
+ }
- // Will initially return absent, because task is not done
- assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent());
- assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent());
+ protected void runGetConfigNonBlockingInKey() throws Exception {
+ Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this");
+
+ @SuppressWarnings("unchecked")
+ TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class)
+ .configure((ConfigKey)(ConfigKey>)TestEntity.CONF_NAME, blockingVal));
+
+ log.trace("get non-blocking");
+ // Will initially return absent, because task is not done
+ assertTrue(entity.config().getNonBlocking(TestEntity.CONF_NAME).isAbsent());
+ log.trace("got absent");
+
+ latch.release();
+
+ // Can now finish task, so will return expectedVal
+ log.trace("get blocking");
+ assertEquals(entity.config().get(TestEntity.CONF_NAME), expectedVal);
+ log.trace("got blocking");
+ assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(), expectedVal);
+
+ latch.acquire();
+ log.trace("finished");
+ }
- latch.countDown();
+ protected void runGetConfigNonBlockingInMap() throws Exception {
+ Preconditions.checkNotNull(blockingVal, "Fixture must set blocking val before running this");
+ TestEntity entity = (TestEntity) mgmt.getEntityManager().createEntity(EntitySpec.create(TestEntity.class)
+ .configure(TestEntity.CONF_MAP_OBJ_THING, ImmutableMap.of("mysub", blockingVal)));
+
+ // Will initially return absent, because task is not done
+ assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).isAbsent());
+ assertTrue(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).isAbsent());
+
+ if (blockingVal instanceof TaskFactory) {
+ assertAllOurConfigTasksCancelled();
+ } else {
+ // TaskFactory tasks are cancelled, but others are not,
+ // things (ValueResolver?) are smart enough to know to leave it running
+ assertAllOurConfigTasksNotCancelled();
+ }
+
+ latch.release();
+
+ // Can now finish task, so will return expectedVal
+ assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mysub", expectedVal));
+ assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")), expectedVal);
+
+ assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal));
+ assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal);
+
+ assertAllTasksDone();
+ }
+
+ private void assertAllOurConfigTasksNotCancelled() {
+ for (Task> t: tasksMadeByFactory) {
+ Assert.assertFalse( t.isCancelled(), "Task should not have been cancelled: "+t+" - "+t.getStatusDetail(false) );
+ }
+ }
- // Can now finish task, so will return expectedVal
- assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING), ImmutableMap.of("mysub", expectedVal));
- assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")), expectedVal);
+ private void assertAllOurConfigTasksCancelled() {
+ // TODO added Feb 2017 - but might need an "eventually" here, if cancel is happening in a BG thread
+ // (but I think it is always foreground)
+ for (Task> t: tasksMadeByFactory) {
+ Assert.assertTrue( t.isCancelled(), "Task should have been cancelled: "+t+" - "+t.getStatusDetail(false) );
+ }
+ }
- assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING).get(), ImmutableMap.of("mysub", expectedVal));
- assertEquals(entity.config().getNonBlocking(TestEntity.CONF_MAP_OBJ_THING.subKey("mysub")).get(), expectedVal);
+ private void assertAllTasksDone() {
+ for (Task> t: tasksMadeByFactory) {
+ Assert.assertTrue( t.isDone(), "Task should have been done: "+t+" - "+t.getStatusDetail(false) );
+ }
+ }
+ }
+
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetTaskNonBlockingKey() throws Exception {
+ new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInKey();
+ }
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetTaskNonBlockingMap() throws Exception {
+ new ConfigNonBlockingFixture().usingTask().runGetConfigNonBlockingInMap();
+ }
+
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetTaskFactoryNonBlockingKey() throws Exception {
+ new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInKey();
+ }
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetTaskFactoryNonBlockingMap() throws Exception {
+ new ConfigNonBlockingFixture().usingTaskFactory().runGetConfigNonBlockingInMap();
+ }
+
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetSupplierNonBlockingKey() throws Exception {
+ new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInKey();
+ }
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetSuppierNonBlockingMap() throws Exception {
+ new ConfigNonBlockingFixture().usingDeferredSupplier().runGetConfigNonBlockingInMap();
+ }
+
+ @Test // fast
+ public void testGetImmediateSupplierNonBlockingKey() throws Exception {
+ new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInKey();
+ }
+ @Test(groups="Integration") // because takes 1s+
+ public void testGetImmediateSupplierNonBlockingMap() throws Exception {
+ new ConfigNonBlockingFixture().usingImmediateSupplier().runGetConfigNonBlockingInMap();
}
@Test
@@ -425,7 +575,7 @@ public String call() {
assertEquals(getConfigFuture.get(TIMEOUT_MS, TimeUnit.MILLISECONDS), "abc");
}
- @Test
+ @Test(groups="Integration") // takes 0.5s
public void testGetConfigWithExecutedTaskWaitsForResult() throws Exception {
LatchingCallable work = new LatchingCallable("abc");
Task task = executionManager.submit(work);
@@ -447,7 +597,7 @@ public String call() {
assertEquals(work.callCount.get(), 1);
}
- @Test
+ @Test(groups="Integration") // takes 0.5s
public void testGetConfigWithUnexecutedTaskIsExecutedAndWaitsForResult() throws Exception {
LatchingCallable work = new LatchingCallable("abc");
Task task = new BasicTask(work);
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java
new file mode 100644
index 0000000000..fe83225816
--- /dev/null
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/InterruptingImmediateSupplierTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.util.concurrent.Callables;
+import com.google.common.util.concurrent.Runnables;
+
+public class InterruptingImmediateSupplierTest {
+
+ @Test(expectedExceptions=UnsupportedOperationException.class)
+ public void testOfInvalidType() throws Exception {
+ InterruptingImmediateSupplier.of("myval");
+ }
+
+ @Test
+ public void testRunnable() throws Exception {
+ assertImmediatelyPresent(Runnables.doNothing(), null);
+ assertImmediatelyAbsent(new SleepingRunnable());
+ assertImmediatelyFails(new FailingRunnable(), MarkerException.class);
+ }
+
+ @Test
+ public void testCallable() throws Exception {
+ assertImmediatelyPresent(Callables.returning("myval"), "myval");
+ assertImmediatelyAbsent(new SleepingCallable());
+ assertImmediatelyFails(new FailingCallable(), MarkerException.class);
+ }
+
+ @Test
+ public void testSupplier() throws Exception {
+ assertImmediatelyPresent(Suppliers.ofInstance("myval"), "myval");
+ assertImmediatelyAbsent(new SleepingSupplier());
+ assertImmediatelyFails(new FailingSupplier(), MarkerException.class);
+ }
+
+ private void assertImmediatelyPresent(Object orig, Object expected) {
+ Maybe result = getImmediately(orig);
+ assertEquals(result.get(), expected);
+ assertFalse(Thread.currentThread().isInterrupted());
+ }
+
+ private void assertImmediatelyAbsent(Object orig) {
+ Maybe result = getImmediately(orig);
+ assertTrue(result.isAbsent(), "result="+result);
+ assertFalse(Thread.currentThread().isInterrupted());
+ }
+
+ private void assertImmediatelyFails(Object orig, Class extends Exception> expected) {
+ try {
+ Maybe result = getImmediately(orig);
+ Asserts.shouldHaveFailedPreviously("result="+result);
+ } catch (Exception e) {
+ Asserts.expectedFailureOfType(e, expected);
+ }
+ assertFalse(Thread.currentThread().isInterrupted());
+ }
+
+ private Maybe getImmediately(Object val) {
+ InterruptingImmediateSupplier supplier = InterruptingImmediateSupplier.of(val);
+ return supplier.getImmediately();
+ }
+
+ public static class SleepingRunnable implements Runnable {
+ @Override public void run() {
+ Time.sleep(Duration.ONE_MINUTE);
+ }
+ }
+
+ public static class SleepingCallable implements Callable {
+ @Override public Void call() {
+ Time.sleep(Duration.ONE_MINUTE);
+ return null;
+ }
+ }
+
+ public static class SleepingSupplier implements Supplier {
+ @Override public Void get() {
+ Time.sleep(Duration.ONE_MINUTE);
+ return null;
+ }
+ }
+
+ public static class FailingRunnable implements Runnable {
+ @Override public void run() {
+ throw new MarkerException();
+ }
+ }
+
+ public static class FailingCallable implements Callable {
+ @Override public Void call() {
+ throw new MarkerException();
+ }
+ }
+
+ public static class FailingSupplier implements Supplier {
+ @Override public Void get() {
+ throw new MarkerException();
+ }
+ }
+
+ public static class MarkerException extends RuntimeException {
+ private static final long serialVersionUID = -3395361406478634652L;
+ }
+}
\ No newline at end of file
diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
index e47e4c9bbc..64cb024439 100644
--- a/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
+++ b/core/src/test/java/org/apache/brooklyn/util/core/task/ValueResolverTest.java
@@ -20,13 +20,17 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.TaskAdaptable;
+import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
import org.apache.brooklyn.test.Asserts;
@@ -37,6 +41,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Callables;
+
/**
* see also {@link TasksTest} for more tests
*/
@@ -137,19 +145,17 @@ public void testDefaultBeforeDelayAndError() {
assertMaybeIsAbsent(result);
Assert.assertEquals(result.get(), "foo");
}
-
+
public void testGetImmediately() {
MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
- assertNull(callInfo.task);
- assertContainsCallingMethod(callInfo.stackTrace, "testGetImmediately");
+ assertImmediateFakeTaskFromMethod(callInfo, "testGetImmediately");
}
public void testImmediateSupplierWithTimeoutUsesBlocking() {
MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier();
CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).timeout(Asserts.DEFAULT_LONG_TIMEOUT).get();
- assertNotNull(callInfo.task);
- assertNotContainsCallingMethod(callInfo.stackTrace, "testImmediateSupplierWithTimeoutUsesBlocking");
+ assertRealTaskNotFromMethod(callInfo, "testImmediateSupplierWithTimeoutUsesBlocking");
}
public void testGetImmediatelyInTask() throws Exception {
@@ -164,16 +170,14 @@ private CallInfo myUniquelyNamedMethod() {
}
});
CallInfo callInfo = task.get();
- assertEquals(callInfo.task, task);
- assertContainsCallingMethod(callInfo.stackTrace, "myUniquelyNamedMethod");
+ assertImmediateFakeTaskFromMethod(callInfo, "myUniquelyNamedMethod");
}
public void testGetImmediatelyFallsBackToDeferredCallInTask() throws Exception {
final MyImmediateAndDeferredSupplier supplier = new MyImmediateAndDeferredSupplier(true);
CallInfo callInfo = Tasks.resolving(supplier).as(CallInfo.class).context(app).immediately(true).get();
- assertNotNull(callInfo.task);
+ assertRealTaskNotFromMethod(callInfo, "testGetImmediatelyFallsBackToDeferredCallInTask");
assertEquals(BrooklynTaskTags.getContextEntity(callInfo.task), app);
- assertNotContainsCallingMethod(callInfo.stackTrace, "testGetImmediatelyFallsBackToDeferredCallInTask");
}
public void testNonRecursiveBlockingFailsOnNonObjectType() throws Exception {
@@ -224,6 +228,94 @@ public void testNonRecursiveImmediately() throws Exception {
assertEquals(result.getClass(), FailingImmediateAndDeferredSupplier.class);
}
+ public void testTaskFactoryGet() {
+ TaskFactory> taskFactory = new TaskFactory>() {
+ @Override public TaskAdaptable newTask() {
+ return new BasicTask<>(Callables.returning("myval"));
+ }
+ };
+ String result = Tasks.resolving(taskFactory).as(String.class).context(app).get();
+ assertEquals(result, "myval");
+ }
+
+ public void testTaskFactoryGetImmediately() {
+ TaskFactory> taskFactory = new TaskFactory>() {
+ @Override public TaskAdaptable newTask() {
+ return new BasicTask<>(Callables.returning("myval"));
+ }
+ };
+ String result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).get();
+ assertEquals(result, "myval");
+ }
+
+ public void testTaskFactoryGetImmediatelyDoesNotBlock() {
+ final AtomicBoolean executing = new AtomicBoolean();
+ TaskFactory> taskFactory = new TaskFactory>() {
+ @Override public TaskAdaptable newTask() {
+ return new BasicTask<>(new Callable() {
+ public String call() {
+ executing.set(true);
+ try {
+ Time.sleep(Duration.ONE_MINUTE);
+ return "myval";
+ } finally {
+ executing.set(false);
+ }
+ }});
+ }
+ };
+ Maybe result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe();
+ Asserts.assertTrue(result.isAbsent(), "result="+result);
+ // the call below default times out after 30s while the task above is still running
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ Asserts.assertFalse(executing.get());
+ }
+ });
+ }
+
+ public void testTaskFactoryGetImmediatelyDoesNotBlockWithNestedTasks() {
+ final int NUM_CALLS = 3;
+ final AtomicInteger executingCount = new AtomicInteger();
+ final List> outerTasks = Lists.newArrayList();
+
+ TaskFactory> taskFactory = new TaskFactory>() {
+ @Override public Task> newTask() {
+ SequentialTask> result = new SequentialTask<>(ImmutableList.of(new Callable() {
+ public String call() {
+ executingCount.incrementAndGet();
+ try {
+ Time.sleep(Duration.ONE_MINUTE);
+ return "myval";
+ } finally {
+ executingCount.decrementAndGet();
+ }
+ }}));
+ outerTasks.add(result);
+ return result;
+ }
+ };
+ for (int i = 0; i < NUM_CALLS; i++) {
+ Maybe result = Tasks.resolving(taskFactory).as(String.class).context(app).immediately(true).getMaybe();
+ Asserts.assertTrue(result.isAbsent(), "result="+result);
+ }
+ // the call below default times out after 30s while the task above is still running
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ Asserts.assertEquals(outerTasks.size(), NUM_CALLS);
+ for (Task> task : outerTasks) {
+ Asserts.assertTrue(task.isDone());
+ Asserts.assertTrue(task.isCancelled());
+ }
+ }
+ });
+ Asserts.succeedsEventually(new Runnable() {
+ public void run() {
+ Asserts.assertEquals(executingCount.get(), 0);
+ }
+ });
+ }
+
private static class MyImmediateAndDeferredSupplier implements ImmediateSupplier, DeferredSupplier {
private final boolean failImmediately;
@@ -359,4 +451,18 @@ private void assertNotContainsCallingMethod(StackTraceElement[] stackTrace, Stri
}
}
}
+
+ private void assertImmediateFakeTaskFromMethod(CallInfo callInfo, String method) {
+ // previously task was null, but now there is a "fake task"
+ assertNotNull(callInfo.task);
+ Assert.assertFalse(callInfo.task.isSubmitted());
+ assertContainsCallingMethod(callInfo.stackTrace, method);
+ }
+
+ private void assertRealTaskNotFromMethod(CallInfo callInfo, String method) {
+ assertNotNull(callInfo.task);
+ Assert.assertTrue(callInfo.task.isSubmitted());
+ assertNotContainsCallingMethod(callInfo.stackTrace, method);
+ }
+
}
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java
index ee49b4abde..3b41cf6587 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/exceptions/Exceptions.java
@@ -20,7 +20,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.instanceOf;
-import static com.google.common.base.Throwables.getCausalChain;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
@@ -55,6 +54,8 @@ public class Exceptions {
private static final List> BORING_IF_NO_MESSAGE_THROWABLE_SUPERTYPES = ImmutableList.>of(
PropagatedRuntimeException.class);
+ public static final int MAX_COLLAPSE_RECURSIVE_DEPTH = 100;
+
/** NB: might be useful for stack trace, e.g. {@link ExecutionException} */
private static boolean isBoringForMessage(Throwable t) {
for (Class extends Throwable> type: ALWAYS_BORING_MESSAGE_THROWABLE_SUPERTYPES)
@@ -263,8 +264,41 @@ public static Throwable collapseIncludingAllCausalMessages(Throwable source) {
public static Throwable collapse(Throwable source, boolean collapseCausalChain) {
return collapse(source, collapseCausalChain, false, ImmutableSet.of(), new Object[0]);
}
+
+ /** As {@link Throwables#getCausalChain(Throwable)} but safe in the face of perverse classes which return themselves as their cause or otherwise have a recursive causal chain. */
+ public static List getCausalChain(Throwable t) {
+ Set result = MutableSet.of();
+ while (t!=null) {
+ if (!result.add(t)) break;
+ t = t.getCause();
+ }
+ return ImmutableList.copyOf(result);
+ }
+
+ private static boolean isCausalChainDepthExceeding(Throwable t, int size) {
+ if (size<0) return true;
+ if (t==null) return false;
+ return isCausalChainDepthExceeding(t.getCause(), size-1);
+ }
private static Throwable collapse(Throwable source, boolean collapseCausalChain, boolean includeAllCausalMessages, Set visited, Object contexts[]) {
+ if (visited.isEmpty()) {
+ if (isCausalChainDepthExceeding(source, MAX_COLLAPSE_RECURSIVE_DEPTH)) {
+ // do fast check above, then do deeper check which survives recursive causes
+ List chain = getCausalChain(source);
+ if (chain.size() > MAX_COLLAPSE_RECURSIVE_DEPTH) {
+ // if it's an OOME or other huge stack, shrink it so we don't spin huge cycles processing the trace and printing it
+ // (sometimes generating subsequent OOME's in logback that mask the first!)
+ // coarse heuristic for how to reduce it, but that's better than killing cpu, causing further errors, and suppressing the root cause altogether!
+ String msg = chain.get(0).getMessage();
+ if (msg.length() > 512) msg = msg.substring(0, 500)+"...";
+ return new PropagatedRuntimeException("Huge stack trace (size "+chain.size()+", removing all but last few), "
+ + "starting: "+chain.get(0).getClass().getName()+": "+msg+"; ultimately caused by: ",
+ chain.get(chain.size() - 10));
+ }
+ }
+ }
+
visited = MutableSet.copyOf(visited);
String message = "";
Throwable collapsed = source;
diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java b/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java
index 65b5d91248..272d3f6df2 100644
--- a/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java
+++ b/utils/common/src/test/java/org/apache/brooklyn/util/exceptions/ExceptionsTest.java
@@ -34,6 +34,7 @@
import org.testng.annotations.Test;
import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
public class ExceptionsTest {
@@ -306,6 +307,25 @@ public void testNestedExecAndProp() {
Assert.assertEquals(Exceptions.collapseText(t), "IOException: 1");
}
+ @Test
+ public void testGetCausalChain() throws Exception {
+ Exception e1 = new Exception("e1");
+ Exception e2 = new Exception("e2", e1);
+ assertEquals(Exceptions.getCausalChain(e2), ImmutableList.of(e2, e1));
+ }
+
+ @Test
+ public void testGetCausalChainRecursive() throws Exception {
+ Exception e1 = new Exception("e1") {
+ private static final long serialVersionUID = 1L;
+ public synchronized Throwable getCause() {
+ return this;
+ }
+ };
+ Exception e2 = new Exception("e2", e1);
+ assertEquals(Exceptions.getCausalChain(e2), ImmutableList.of(e2, e1));
+ }
+
@Test
public void testComplexJcloudsExample() {
Throwable t;