Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.concurrent.Executor;

import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.util.guava.Maybe;

import com.google.common.annotations.Beta;

/**
* This is a Brooklyn extension to the Java {@link Executor}.
Expand Down Expand Up @@ -64,4 +67,18 @@ public interface ExecutionContext extends Executor {

boolean isShutdown();

/**
* Gets the value promptly, or returns {@link Maybe#absent()} if the value is not yet available.
* It may throw an error if it cannot be determined whether a value is available immediately or not.
* <p>
* Implementations will typically attempt to execute in the current thread, with appropriate
* tricks to make it look like it is in a sub-thread, and will attempt to be non-blocking but
* if needed they may block.
* <p>
* Supports {@link Callable} and {@link Runnable} and some {@link Task} targets to be evaluated with "immediate" semantics.
*/
// TODO reference ImmediateSupplier when that class is moved to utils project
@Beta
<T> Maybe<T> getImmediately(Object callableOrSupplierOrTask);

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.brooklyn.util.core.task.DeferredSupplier;
import org.apache.brooklyn.util.core.task.ImmediateSupplier;
import org.apache.brooklyn.util.core.task.TaskBuilder;
import org.apache.brooklyn.util.core.task.TaskTags;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.groovy.GroovyJavaMethods;
Expand Down Expand Up @@ -206,6 +207,15 @@ public Maybe<Entity> getImmediately() {
}
}

@Override
public Entity get() {
try {
return call();
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}

@Override
public Entity call() throws Exception {
return callImpl(false).get();
Expand All @@ -219,7 +229,7 @@ protected Maybe<Entity> getEntity(boolean immediate) {
return Maybe.of(scopeComponent.get());
}
} else {
return Maybe.<Entity>of(entity());
return Maybe.<Entity>ofDisallowingNull(entity()).or(Maybe.<Entity>absent("Context entity not available when trying to evaluate Brooklyn DSL"));
}
}

Expand Down Expand Up @@ -311,10 +321,11 @@ protected Maybe<Entity> callImpl(boolean immediate) throws Exception {
return Maybe.of(result.get());
}

// TODO may want to block and repeat on new entities joining?
throw new NoSuchElementException("No entity matching id " + desiredComponentId+
// could be nice if DSL has an extra .block() method to allow it to wait for a matching entity.
// previously we threw if nothing existed; now we return an absent with a detailed error
return Maybe.absent(new NoSuchElementException("No entity matching id " + desiredComponentId+
(scope==Scope.GLOBAL ? "" : ", in scope "+scope+" wrt "+entity+
(scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : "")));
(scopeComponent!=null ? " ("+scopeComponent+" from "+entity()+")" : ""))));
}

private ExecutionContext getExecutionContext() {
Expand Down Expand Up @@ -538,14 +549,11 @@ protected String resolveKeyName(boolean immediately) {

@Override
public final Maybe<Object> getImmediately() {
Maybe<Entity> targetEntityMaybe = component.getImmediately();
if (targetEntityMaybe.isAbsent()) return Maybe.absent("Target entity not available");
EntityInternal targetEntity = (EntityInternal) targetEntityMaybe.get();

String keyNameS = resolveKeyName(true);
ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyNameS);
Maybe<?> result = targetEntity.config().getNonBlocking(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS));
return Maybe.<Object>cast(result);
Maybe<Object> maybeWrappedMaybe = findExecutionContext(this).getImmediately(newCallableReturningImmediateMaybeOrNonImmediateValue(true));
// the answer will be wrapped twice due to the callable semantics;
// the inner present/absent is important; it will only get an outer absent if interrupted
if (maybeWrappedMaybe.isAbsent()) return maybeWrappedMaybe;
return Maybe.<Object>cast( (Maybe<?>) maybeWrappedMaybe.get() );
}

@Override
Expand All @@ -554,15 +562,55 @@ public Task<Object> newTask() {
.displayName("retrieving config for "+keyName)
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
.dynamic(false)
.body(new Callable<Object>() {
@Override
public Object call() throws Exception {
Entity targetEntity = component.get();
String keyNameS = resolveKeyName(true);
ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyNameS);
return targetEntity.getConfig(key != null ? key : ConfigKeys.newConfigKey(Object.class, keyNameS));
}})
.build();
.body(newCallableReturningImmediateMaybeOrNonImmediateValue(false)).build();
}

private Callable<Object> newCallableReturningImmediateMaybeOrNonImmediateValue(final boolean immediate) {
return new Callable<Object>() {
@Override
public Object call() throws Exception {
Entity targetEntity;
if (immediate) {
Maybe<Entity> targetEntityMaybe = component.getImmediately();
if (targetEntityMaybe.isAbsent()) return Maybe.<Object>cast(targetEntityMaybe);
targetEntity = (EntityInternal) targetEntityMaybe.get();
} else {
targetEntity = component.get();
}

// this is always run in a new dedicated task (possibly a fake task if immediate), so no need to clear
checkAndTagForRecursiveReference(targetEntity);

String keyNameS = resolveKeyName(true);
ConfigKey<?> key = targetEntity.getEntityType().getConfigKey(keyNameS);
if (key==null) key = ConfigKeys.newConfigKey(Object.class, keyNameS);
if (immediate) {
return ((EntityInternal)targetEntity).config().getNonBlocking(key);
} else {
return targetEntity.getConfig(key);
}
}
};
}

private void checkAndTagForRecursiveReference(Entity targetEntity) {
String tag = "DSL:entity('"+targetEntity.getId()+"').config('"+keyName+"')";
Task<?> ancestor = Tasks.current();
if (ancestor!=null) {
// don't check on ourself; only look higher in hierarchy;
// this assumes impls always spawn new tasks (which they do, just maybe not always in new threads)
// but it means it does not rely on tag removal to prevent weird errors,
// and more importantly it makes the strategy idempotent
ancestor = ancestor.getSubmittedByTask();
}
while (ancestor!=null) {
if (TaskTags.hasTag(ancestor, tag)) {
throw new IllegalStateException("Recursive config reference "+tag);
}
ancestor = ancestor.getSubmittedByTask();
}

Tasks.addTagDynamically(tag);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@

import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand All @@ -44,7 +50,6 @@

public class ConfigYamlTest extends AbstractYamlTest {

@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(ConfigYamlTest.class);

private ExecutorService executor;
Expand Down Expand Up @@ -91,6 +96,61 @@ public void testConfigInConfigBlock() throws Exception {
assertNull(entity.getMyField()); // field with @SetFromFlag
assertNull(entity.getMyField2()); // field with @SetFromFlag("myField2Alias"), set using alias
}


@Test
public void testRecursiveConfigFailsGracefully() throws Exception {
doTestRecursiveConfigFailsGracefully(false);
}

@Test
public void testRecursiveConfigImmediateFailsGracefully() throws Exception {
doTestRecursiveConfigFailsGracefully(true);
}

protected void doTestRecursiveConfigFailsGracefully(boolean immediate) throws Exception {
String yaml = Joiner.on("\n").join(
"services:",
"- type: org.apache.brooklyn.core.test.entity.TestEntity",
" brooklyn.config:",
" infinite_loop: $brooklyn:config(\"infinite_loop\")");

final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren());

Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Time.sleep(Duration.FIVE_SECONDS);
// error, loop wasn't interrupted or detected
LOG.warn("Timeout elapsed, destroying items; usage: "+
((LocalManagementContext)mgmt()).getGarbageCollector().getUsageString());
Entities.destroy(app);
} catch (RuntimeInterruptedException e) {
// expected on normal execution; clear the interrupted flag to prevent ugly further warnings being logged
Thread.interrupted();
}
}
});
t.start();
try {
String c;
if (immediate) {
// this should throw rather than return "absent", because the error is definitive (absent means couldn't resolve in time)
c = entity.config().getNonBlocking(ConfigKeys.newStringConfigKey("infinite_loop")).or("FAILED");
} else {
c = entity.config().get(ConfigKeys.newStringConfigKey("infinite_loop"));
}
Asserts.shouldHaveFailedPreviously("Expected recursive error, instead got: "+c);
} catch (Exception e) {
Asserts.expectedFailureContainsIgnoreCase(e, "infinite_loop", "recursive");
} finally {
if (!Entities.isManaged(app)) {
t.interrupt();
}
}
}

@Test
public void testConfigAtTopLevel() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.Identifiers;
import org.apache.brooklyn.util.time.Duration;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -296,14 +297,13 @@ public void testUrlEncode() throws Exception {
@Test
public void testEntityNotFound() throws Exception {
BrooklynDslDeferredSupplier<?> dsl = BrooklynDslCommon.entity("myIdDoesNotExist");
Maybe<?> actualValue = execDslImmediately(dsl, Entity.class, app, true);
Assert.assertTrue(actualValue.isAbsent());
try {
Maybe<?> actualValue = execDslImmediately(dsl, Entity.class, app, true);
actualValue.get();
Asserts.shouldHaveFailedPreviously("actual="+actualValue);
} catch (Exception e) {
NoSuchElementException nsee = Exceptions.getFirstThrowableOfType(e, NoSuchElementException.class);
if (nsee == null) {
throw e;
}
Asserts.expectedFailureOfType(e, NoSuchElementException.class);
}
}

Expand Down Expand Up @@ -365,7 +365,7 @@ public DslTestWorker satisfiedAsynchronously(boolean val) {
return this;
}

@SuppressWarnings("unused") // included for completeness?
@SuppressWarnings("unused") // kept in case useful for additional tests, for completeness
public DslTestWorker wrapInTaskForImmediately(boolean val) {
wrapInTaskForImmediately = val;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;

import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.config.ConfigInheritance;
import org.apache.brooklyn.config.ConfigInheritances;
Expand Down Expand Up @@ -231,7 +232,7 @@ public Maybe<Object> getConfigRaw(ConfigKey<?> key, boolean includeInherited) {
}

protected Object coerceConfigVal(ConfigKey<?> key, Object v) {
if ((v instanceof Future) || (v instanceof DeferredSupplier)) {
if ((v instanceof Future) || (v instanceof DeferredSupplier) || (v instanceof TaskFactory)) {
// no coercion for these (coerce on exit)
return v;
} else if (key instanceof StructuredConfigKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class BrooklynTaskTags extends TaskTags {
* and that it need not appear in some task lists;
* often used for framework lifecycle events and sensor polling */
public static final String TRANSIENT_TASK_TAG = "TRANSIENT";
/** marks that a task is meant to return immediately, without blocking (or if absolutely necessary blocking for a short while) */
public static final String IMMEDIATE_TASK_TAG = "IMMEDIATE";

// ------------- entity tags -------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ protected <T> Maybe<T> getNonBlockingResolvingSimple(ConfigKey<T> key) {
.immediately(true)
.deep(true)
.context(getContext())
.swallowExceptions()
.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably agree with this change, but don't feel confident about the full implications of it throwing the exception rather than returning the default value versus absent. In your test doTestRecursiveConfigFailsGracefully it certainly makes sense, but not sure what else this will affect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the immediate stuff is quite new so i'd prefer to try this. if it's a problem we should perhaps look at wrapping in a ReferenceWithError as above, then caller will be forced to deal with the failure in the appropriate way.

return (resolved != marker)
? TypeCoercions.tryCoerce(resolved, key.getTypeToken())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedEntity;
import org.apache.brooklyn.core.mgmt.entitlement.Entitlements;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.task.ImmediateSupplier.ImmediateUnsupportedException;
import org.apache.brooklyn.util.guava.Maybe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;

/**
Expand Down Expand Up @@ -96,7 +99,55 @@ public ExecutionManager getExecutionManager() {
/** returns tasks started by this context (or tasks which have all the tags on this object) */
@Override
public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags(tags); }


/** performs execution without spawning a new task thread, though it does temporarily set a fake task for the purpose of getting context;
* currently supports {@link Supplier}, {@link Callable}, {@link Runnable}, or {@link Task} instances;
* with tasks if it is submitted or in progress,
* it fails if not completed; with unsubmitted, unqueued tasks, it gets the {@link Callable} job and
* uses that; with such a job, or any other callable/supplier/runnable, it runs that
* in an {@link InterruptingImmediateSupplier}, with as much metadata as possible (eg task name if
* given a task) set <i>temporarily</i> in the current thread context */
@SuppressWarnings("unchecked")
@Override
public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
BasicTask<?> fakeTaskForContext;
if (callableOrSupplier instanceof BasicTask) {
fakeTaskForContext = (BasicTask<?>)callableOrSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what code path do we get here? Looking at ValueResolver.getMaybeInternal(...), allowImmediateExecution will be false if the object is a task (rather than a TaskFactory or DeferredSupplier).

For example, the test below fails (added to ValueResolverTest). It leaves a single instance of the task executing.

    public void testTaskGetImmediatelyDoesNotBlock() {
        final AtomicInteger executingCount = new AtomicInteger();
        
        final Task<String> task = new BasicTask<>(new Callable<String>() {
            public String call() {
                executingCount.incrementAndGet();
                try {
                    Time.sleep(Duration.ONE_MINUTE);
                    return "myval";
                } finally {
                    executingCount.decrementAndGet();
                }
            }});
        
        for (int i = 0; i < 3; i++) {
            Maybe<String> result = Tasks.resolving(task).as(String.class).context(app).immediately(true).getMaybe();
            Asserts.assertTrue(result.isAbsent(), "result="+result);
        }

        Asserts.assertFalse(task.isSubmitted());
        
        // The call below default times out after 30s while the task above is still running
        // Expect the task to not be left running.
        Asserts.succeedsEventually(new Runnable() {
            public void run() {
                Asserts.assertEquals(executingCount.get(), 0);
            }
        });
    }

I'm not convinced that we want to support setting a config key with a value of type Task (long term). But I guess that will require more work, to change how DependentConfiguration is implemented. So we do need to support it short-term :-(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I change ValueResolver.getMaybeInternal() to set allowImmediateExecution = true; when it is passed a TaskAdaptable, then the above test does pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as noted above leaving the task running in background (or possibly not submitting it) is the right thing to do for immediate execution of a task; cancelling it is risky unless we know the task is dedicated to this one usage (hence preferring TaskFactory); in particular EntityConfigTest.testGetTaskNonBlockingKey() fails if allowImmediateExecution=true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for the particular code block marked here, it is invoked from ValueResolver when you supply a TaskFactory; VR generates the Task and passes it in with allowImmediateExecution=true

if (fakeTaskForContext.isQueuedOrSubmitted()) {
if (fakeTaskForContext.isDone()) {
return Maybe.of((T)fakeTaskForContext.getUnchecked());
} else {
throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+fakeTaskForContext);
}
}
callableOrSupplier = fakeTaskForContext.getJob();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clever, getting the underlying Callable to execute from the job!

However, I wonder if this just reduces the chance of leaving tasks behind (rather than preventing it).

The test below fails when added to ValueResolverTest (when also including the change to ValueResolver.getMaybeInternal() to set allowImmediateExecution = true so that this code path is taken).

    public void testTaskGetImmediatelyDoesNotBlockWithNestedTasks() {
        final AtomicInteger executingCount = new AtomicInteger();
        
        final SequentialTask<?> outerTask = new SequentialTask<>(ImmutableList.of(new Callable<String>() {
            public String call() {
                executingCount.incrementAndGet();
                try {
                    Time.sleep(Duration.ONE_MINUTE);
                    return "myval";
                } finally {
                    executingCount.decrementAndGet();
                }
            }}));
        
        for (int i = 0; i < 3; i++) {
            Maybe<String> result = Tasks.resolving(outerTask).as(String.class).context(app).immediately(true).getMaybe();
            Asserts.assertTrue(result.isAbsent(), "result="+result);
        }
        
        Asserts.assertFalse(outerTask.isSubmitted());
        
        // the call below default times out after 30s while the task above is still running
        Asserts.succeedsEventually(new Runnable() {
            public void run() {
                Asserts.assertEquals(executingCount.get(), 0);
            }
        });
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

} else {
fakeTaskForContext = new BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
}
fakeTaskForContext.tags.addAll(tags);
fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);

Task<?> previousTask = BasicExecutionManager.getPerThreadCurrentTask().get();
BasicExecutionContext oldExecutionContext = getCurrentExecutionContext();
registerPerThreadExecutionContext();

if (previousTask!=null) fakeTaskForContext.setSubmittedByTask(previousTask);
fakeTaskForContext.cancel();
try {
BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);

if (!(callableOrSupplier instanceof ImmediateSupplier)) {
callableOrSupplier = InterruptingImmediateSupplier.of(callableOrSupplier);
}
return ((ImmediateSupplier<T>)callableOrSupplier).getImmediately();

} finally {
BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
perThreadExecutionContext.set(oldExecutionContext);
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) {
Expand Down
Loading