From 50a5b4b94d3a12fcb9e53563bc5c2d1e63d244e8 Mon Sep 17 00:00:00 2001 From: Max Cai Date: Sun, 20 Nov 2016 07:26:01 +0100 Subject: [PATCH] Add "thenCheck()" to CompiledRepository. "thenCheck()" can be used after another 'then' directive, which normally generates the new value of the repository. Using a "thenCheck()" directive, the would-have-been new value becomes a mere "candidate new value", and can be vetoed by the check -- the clause that follows "thenCheck()" can skip the update, end with another value, or even have the flow continue onwards. The test cases use two demo-style repositories that perform a typical task of requesting a keyed item from the in-memory cache, then disk cache, then network, showcasing the real-life use of this new directive. --- .../android/agera/CompiledRepository.java | 27 +- .../android/agera/RepositoryCompiler.java | 85 +++-- .../agera/RepositoryCompilerStates.java | 90 ++++- .../agera/RepositoryThenCheckTest.java | 319 ++++++++++++++++++ 4 files changed, 483 insertions(+), 38 deletions(-) create mode 100644 agera/src/test/java/com/google/android/agera/RepositoryThenCheckTest.java diff --git a/agera/src/main/java/com/google/android/agera/CompiledRepository.java b/agera/src/main/java/com/google/android/agera/CompiledRepository.java index 72bb6ef..b4fed9b 100644 --- a/agera/src/main/java/com/google/android/agera/CompiledRepository.java +++ b/agera/src/main/java/com/google/android/agera/CompiledRepository.java @@ -277,6 +277,7 @@ private void checkRestartLocked() { private static final int BIND = 8; private static final int FILTER_SUCCESS = 9; private static final int FILTER_FAILURE = 10; + private static final int FILTER_FAILED_CHECK = 11; /** * @param asynchronously Whether this flow is run asynchronously. True after the first goTo and @@ -339,6 +340,9 @@ private void runFlowFrom(final int index, final boolean asynchronously) { case FILTER_FAILURE: i = runFilterFailure(directives, i); break; + case FILTER_FAILED_CHECK: + i = runFilterFailedCheck(directives, i); + break; case END: i = runEnd(directives, i); break; @@ -484,7 +488,7 @@ static void addFilterFailure(@NonNull final List directives) { private int runFilterFailure(@NonNull final Object[] directives, final int index) { final Result tryValue = (Result) intermediateValue; if (tryValue.succeeded()) { - runTerminate(tryValue.get(), identityFunction()); + setNewValueAndEndFlow(tryValue.get()); return -1; } else { intermediateValue = tryValue.getFailure(); @@ -492,6 +496,27 @@ private int runFilterFailure(@NonNull final Object[] directives, final int index } } + static void addFilterFailedCheck(@NonNull final Function caseFunction, + @NonNull final Predicate casePredicate, + @NonNull final List directives) { + directives.add(FILTER_FAILED_CHECK); + directives.add(caseFunction); + directives.add(casePredicate); + } + + private int runFilterFailedCheck(@NonNull final Object[] directives, final int index) { + final Function caseFunction = (Function) directives[index + 1]; + final Predicate casePredicate = (Predicate) directives[index + 2]; + + final Object caseValue = caseFunction.apply(intermediateValue); + if (casePredicate.apply(caseValue)) { + setNewValueAndEndFlow(intermediateValue); + return -1; + } else { + return index + 3; + } + } + private void runTerminate(@NonNull final Object caseValue, @Nullable final Function terminatingValueFunction) { if (terminatingValueFunction == null) { diff --git a/agera/src/main/java/com/google/android/agera/RepositoryCompiler.java b/agera/src/main/java/com/google/android/agera/RepositoryCompiler.java index f6d73bc..603a6e7 100644 --- a/agera/src/main/java/com/google/android/agera/RepositoryCompiler.java +++ b/agera/src/main/java/com/google/android/agera/RepositoryCompiler.java @@ -19,6 +19,7 @@ import static com.google.android.agera.CompiledRepository.addBindWith; import static com.google.android.agera.CompiledRepository.addCheck; import static com.google.android.agera.CompiledRepository.addEnd; +import static com.google.android.agera.CompiledRepository.addFilterFailedCheck; import static com.google.android.agera.CompiledRepository.addFilterFailure; import static com.google.android.agera.CompiledRepository.addFilterSuccess; import static com.google.android.agera.CompiledRepository.addGetFrom; @@ -48,7 +49,7 @@ final class RepositoryCompiler implements RepositoryCompilerStates.RFrequency, RepositoryCompilerStates.RFlow, RepositoryCompilerStates.RTerminationOrContinue, - RepositoryCompilerStates.RConfig { + RepositoryCompilerStates.RThenCheckOrConfig { private static final ThreadLocal compilers = new ThreadLocal<>(); @@ -75,7 +76,7 @@ private static void recycle(@NonNull final RepositoryCompiler compiler) { @Retention(RetentionPolicy.SOURCE) @IntDef({NOTHING, FIRST_EVENT_SOURCE, FREQUENCY_OR_MORE_EVENT_SOURCE, FLOW, - TERMINATE_THEN_FLOW, TERMINATE_THEN_END, CONFIG}) + TERMINATE_THEN_FLOW, TERMINATE_THEN_END, THEN_CHECK_OR_CONFIG, CONFIG}) private @interface Expect {} private static final int NOTHING = 0; @@ -84,7 +85,8 @@ private static void recycle(@NonNull final RepositoryCompiler compiler) { private static final int FLOW = 3; private static final int TERMINATE_THEN_FLOW = 4; private static final int TERMINATE_THEN_END = 5; - private static final int CONFIG = 6; + private static final int THEN_CHECK_OR_CONFIG = 6; + private static final int CONFIG = 7; private Object initialValue; private final ArrayList eventSources = new ArrayList<>(); @@ -95,6 +97,7 @@ private static void recycle(@NonNull final RepositoryCompiler compiler) { private Function caseExtractor; private Predicate casePredicate; private boolean goLazyUsed; + private boolean inThenCheckTerminationClause; private Merger notifyChecker = objectsUnequal(); @RepositoryConfig private int deactivationConfig; @@ -128,6 +131,20 @@ private void checkGoLazyUnused() { checkState(!goLazyUsed, "Unexpected occurrence of async directive after goLazy()"); } + private void checkExpectConfigAndEnsureEndFlow() { + if (expect == THEN_CHECK_OR_CONFIG) { + endFlow(false); + expect = CONFIG; + } else { + checkExpect(CONFIG); + } + } + + private void endFlow(final boolean skip) { + addEnd(skip, directives); + expect = CONFIG; + } + //region REventSource @NonNull @@ -226,6 +243,7 @@ public RepositoryCompiler bindWith(@NonNull final Supplier secondValueSupplier, @NonNull @Override public RepositoryCompiler thenSkip() { + checkExpect(FLOW); endFlow(true); return this; } @@ -234,7 +252,7 @@ public RepositoryCompiler thenSkip() { @Override public RepositoryCompiler thenGetFrom(@NonNull final Supplier supplier) { getFrom(supplier); - endFlow(false); + expect = THEN_CHECK_OR_CONFIG; return this; } @@ -243,7 +261,7 @@ public RepositoryCompiler thenGetFrom(@NonNull final Supplier supplier) { public RepositoryCompiler thenMergeIn( @NonNull final Supplier supplier, @NonNull final Merger merger) { mergeIn(supplier, merger); - endFlow(false); + expect = THEN_CHECK_OR_CONFIG; return this; } @@ -251,15 +269,10 @@ public RepositoryCompiler thenMergeIn( @Override public RepositoryCompiler thenTransform(@NonNull final Function function) { transform(function); - endFlow(false); + expect = THEN_CHECK_OR_CONFIG; return this; } - private void endFlow(final boolean skip) { - addEnd(skip, directives); - expect = CONFIG; - } - @NonNull @Override public RepositoryCompiler attemptGetFrom(@NonNull final Supplier attemptSupplier) { @@ -335,7 +348,7 @@ public RepositoryCompiler goLazy() { //endregion RFlow - //region RTermination + //region RTerminationOrContinue @NonNull @Override @@ -360,10 +373,12 @@ private void terminate(@Nullable final Function valueFunction) { } caseExtractor = null; casePredicate = null; - if (expect == TERMINATE_THEN_END) { + if (expect == TERMINATE_THEN_FLOW) { + expect = FLOW; + } else if (inThenCheckTerminationClause) { endFlow(false); } else { - expect = FLOW; + expect = THEN_CHECK_OR_CONFIG; } } @@ -371,19 +386,44 @@ private void terminate(@Nullable final Function valueFunction) { @Override public RepositoryCompiler orContinue() { checkExpect(TERMINATE_THEN_END); - addFilterFailure(directives); + if (inThenCheckTerminationClause) { + addFilterFailedCheck(caseExtractor, casePredicate, directives); + caseExtractor = null; + casePredicate = null; + inThenCheckTerminationClause = false; + } else { + addFilterFailure(directives); + } expect = FLOW; return this; } - //endregion RTermination + //endregion RTerminationOrContinue + + //region RThenCheckOrConfig + + @NonNull + @Override + public RepositoryCompiler thenCheck(@NonNull final Predicate predicate) { + return thenCheck(identityFunction(), predicate); + } - //region RConfig + @NonNull + @Override + public RepositoryCompiler thenCheck( + @NonNull final Function function, @NonNull final Predicate predicate) { + checkExpect(THEN_CHECK_OR_CONFIG); + caseExtractor = checkNotNull(function); + casePredicate = checkNotNull(predicate); + expect = TERMINATE_THEN_END; + inThenCheckTerminationClause = true; + return this; + } @NonNull @Override public RepositoryCompiler notifyIf(@NonNull final Merger notifyChecker) { - checkExpect(CONFIG); + checkExpectConfigAndEnsureEndFlow(); this.notifyChecker = checkNotNull(notifyChecker); return this; } @@ -391,7 +431,7 @@ public RepositoryCompiler notifyIf(@NonNull final Merger notifyChecker) { @NonNull @Override public RepositoryCompiler onDeactivation(@RepositoryConfig final int deactivationConfig) { - checkExpect(CONFIG); + checkExpectConfigAndEnsureEndFlow(); this.deactivationConfig = deactivationConfig; return this; } @@ -399,7 +439,7 @@ public RepositoryCompiler onDeactivation(@RepositoryConfig final int deactivatio @NonNull @Override public RepositoryCompiler onConcurrentUpdate(@RepositoryConfig final int concurrentUpdateConfig) { - checkExpect(CONFIG); + checkExpectConfigAndEnsureEndFlow(); this.concurrentUpdateConfig = concurrentUpdateConfig; return this; } @@ -407,7 +447,7 @@ public RepositoryCompiler onConcurrentUpdate(@RepositoryConfig final int concurr @NonNull @Override public RepositoryCompiler sendDiscardedValuesTo(@NonNull final Receiver disposer) { - checkExpect(CONFIG); + checkExpectConfigAndEnsureEndFlow(); discardedValueDisposer = checkNotNull(disposer); return this; } @@ -431,7 +471,7 @@ public RepositoryCompiler compileIntoRepositoryWithInitialValue(@NonNull final O @NonNull private Repository compileRepositoryAndReset() { - checkExpect(CONFIG); + checkExpectConfigAndEnsureEndFlow(); Repository repository = compiledRepository(initialValue, eventSources, frequency, directives, notifyChecker, concurrentUpdateConfig, deactivationConfig, discardedValueDisposer); expect = NOTHING; @@ -440,6 +480,7 @@ private Repository compileRepositoryAndReset() { frequency = 0; directives.clear(); goLazyUsed = false; + inThenCheckTerminationClause = false; notifyChecker = objectsUnequal(); deactivationConfig = RepositoryConfig.CONTINUE_FLOW; concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW; diff --git a/agera/src/main/java/com/google/android/agera/RepositoryCompilerStates.java b/agera/src/main/java/com/google/android/agera/RepositoryCompilerStates.java index a273eae..4ffe720 100644 --- a/agera/src/main/java/com/google/android/agera/RepositoryCompilerStates.java +++ b/agera/src/main/java/com/google/android/agera/RepositoryCompilerStates.java @@ -46,13 +46,15 @@ *
  • {@link RFlow#goTo goTo(e)} *
  • {@link RFlow#goLazy goLazy()} *
  • {@link RFlow#thenSkip thenSkip()} + *
  • {@link RThenCheckOrConfig#thenCheck(Predicate) thenCheck(p)}.term + *
  • {@link RThenCheckOrConfig#thenCheck(Function, Predicate) thenCheck(f, p)}.term * * where term (the termination clause) is one of: *
      *
    • {@link RTermination#orSkip orSkip()} *
    • {@link RTermination#orEnd orEnd(f)} + *
    • {@link RTerminationOrContinue#orContinue() orContinue()} (if directive starts with 'then') *
    - * */ public interface RepositoryCompilerStates { @@ -117,6 +119,11 @@ interface RFlow> @Override RFlow getFrom(@NonNull Supplier supplier); + @NonNull + @Override + RThenCheckOrConfig> thenGetFrom( + @NonNull Supplier supplier); + @NonNull @Override RTermination> attemptGetFrom( @@ -124,7 +131,7 @@ interface RFlow> @NonNull @Override - RTerminationOrContinue, + RTerminationOrContinue>, RFlow> thenAttemptGetFrom( @NonNull Supplier> attemptSupplier); @@ -133,6 +140,12 @@ interface RFlow> RFlow mergeIn(@NonNull Supplier supplier, @NonNull Merger merger); + @NonNull + @Override + RThenCheckOrConfig> thenMergeIn( + @NonNull Supplier supplier, + @NonNull Merger merger); + @NonNull @Override RTermination> attemptMergeIn( @@ -141,7 +154,7 @@ interface RFlow> @NonNull @Override - RTerminationOrContinue, + RTerminationOrContinue>, RFlow> thenAttemptMergeIn( @NonNull Supplier supplier, @NonNull Merger> @Override RFlow transform(@NonNull Function function); + @NonNull + @Override + RThenCheckOrConfig> thenTransform( + @NonNull Function function); + @NonNull @Override RTermination> attemptTransform( @@ -158,7 +176,7 @@ interface RFlow> @NonNull @Override - RTerminationOrContinue, + RTerminationOrContinue>, RFlow> thenAttemptTransform( @NonNull Function> attemptFunction); @@ -322,7 +340,8 @@ TSelf bindWith(@NonNull Supplier secondValueSupplier, * compiled repository, with notification if necessary. */ @NonNull - RConfig thenGetFrom(@NonNull Supplier supplier); + RThenCheckOrConfig> thenGetFrom( + @NonNull Supplier supplier); /** * Perform the {@link #attemptGetFrom} directive and use the successful output value as the new @@ -331,7 +350,8 @@ TSelf bindWith(@NonNull Supplier secondValueSupplier, * depending on the clause that follows. */ @NonNull - RTerminationOrContinue, + RTerminationOrContinue>, ? extends RSyncFlow> thenAttemptGetFrom( @NonNull Supplier> attemptSupplier); @@ -340,7 +360,8 @@ TSelf bindWith(@NonNull Supplier secondValueSupplier, * compiled repository, with notification if necessary. */ @NonNull - RConfig thenMergeIn(@NonNull Supplier supplier, + RThenCheckOrConfig> thenMergeIn( + @NonNull Supplier supplier, @NonNull Merger merger); /** @@ -350,7 +371,8 @@ RConfig thenMergeIn(@NonNull Supplier supplier, * depending on the clause that follows. */ @NonNull - RTerminationOrContinue, + RTerminationOrContinue>, ? extends RSyncFlow> thenAttemptMergeIn( @NonNull Supplier supplier, @NonNull Merger RConfig thenMergeIn(@NonNull Supplier supplier, * compiled repository, with notification if necessary. */ @NonNull - RConfig thenTransform( + RThenCheckOrConfig> thenTransform( @NonNull Function function); /** @@ -371,7 +393,8 @@ RConfig thenTransform( * depending on the clause that follows. */ @NonNull - RTerminationOrContinue, + RTerminationOrContinue>, ? extends RSyncFlow> thenAttemptTransform( @NonNull Function> attemptFunction); } @@ -403,7 +426,7 @@ interface RTermination { /** * Compiler state allowing to terminate or continue the data processing flow following a failed - * attempt to produce the new value of the repository. + * check at the previous breaking point of the data processing flow. * * @param Value type of the repository. * @param Value type from which to terminate the flow. @@ -415,15 +438,52 @@ interface RTerminationOrContinue extends RTermination { /** - * If the previous attempt failed, continue with the rest of the data processing flow, using the - * {@linkplain Result#getFailure() failure} as the input value to the next directive. Otherwise, - * end the data processing flow and use the successful output value from the attempt as the new - * value of the compiled repository, with notification if necessary. + * If the previous check failed, continue with the rest of the data processing flow. If the + * failed check was due to a failed attempt, then the next directive receives the + * {@linkplain Result#getFailure() failure} as its input. If the failed check was at a TODO + * If the previous check did not fail, end the data processing flow and use the last output + * value as the new value of the compiled repository, with notification if necessary. */ @NonNull TCon orContinue(); } + /** + * Compiler state allowing to perform a free-form check on a candidate new value produced by the + * previous {@code then}-directive, and resume the data processing flow if necessary or move on to + * configuration and ending the declaration of the repository. + * + * @param Repository value type. + * @param Compiler state to return to if the flow is to continue. + */ + interface RThenCheckOrConfig extends RConfig { + + /** + * Check the candidate new value with the given predicate. If the predicate applies, use it as + * the new value of the compiled repository, with notification if necessary. If the predicate + * does not apply, then either terminate the flow in a different way (skipping or using another + * value), or continue onto the next directive, depending on the clause that follows. The + * termination clause takes the candidate new value as its input. + */ + @NonNull + RTerminationOrContinue, TCon> thenCheck( + @NonNull Predicate predicate); + + /** + * Use the case-function to compute a case value out of the candidate new value and check it + * with the given predicate. If the predicate applies to the case value, use the candidate new + * value (not the case value) as the new value of the compiled repository, with notification if + * necessary. If the predicate does not apply to the case value, then either terminate the flow + * in a different way (skipping or using another value), or continue onto the next directive, + * depending on the clause that follows. The termination clause takes the case value as + * its input. + */ + @NonNull + RTerminationOrContinue, TCon> thenCheck( + @NonNull Function caseFunction, + @NonNull Predicate casePredicate); + } + /** * Compiler state allowing to configure and end the declaration of the repository. * diff --git a/agera/src/test/java/com/google/android/agera/RepositoryThenCheckTest.java b/agera/src/test/java/com/google/android/agera/RepositoryThenCheckTest.java new file mode 100644 index 0000000..061633b --- /dev/null +++ b/agera/src/test/java/com/google/android/agera/RepositoryThenCheckTest.java @@ -0,0 +1,319 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.agera; + +import android.net.Uri; +import android.support.annotation.NonNull; + +import com.google.android.agera.test.SingleSlotDelayedExecutor; +import com.google.android.agera.test.mocks.MockUpdatable; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.robolectric.RobolectricTestRunner; +import org.robolectric.annotation.Config; + +import java.io.IOException; +import java.net.SocketException; + +import static com.google.android.agera.Functions.staticFunction; +import static com.google.android.agera.Predicates.equalTo; +import static com.google.android.agera.Repositories.mutableRepository; +import static com.google.android.agera.Repositories.repositoryWithInitialValue; +import static com.google.android.agera.Result.absent; +import static com.google.android.agera.Result.failure; +import static com.google.android.agera.Result.present; +import static com.google.android.agera.Result.success; +import static com.google.android.agera.Suppliers.staticSupplier; +import static com.google.android.agera.test.matchers.SupplierGives.has; +import static com.google.android.agera.test.matchers.UpdatableUpdated.wasNotUpdated; +import static com.google.android.agera.test.mocks.MockUpdatable.mockUpdatable; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; +import static org.robolectric.annotation.Config.NONE; + +@Config(manifest = NONE) +@RunWith(RobolectricTestRunner.class) +public final class RepositoryThenCheckTest { + private static final char LOADING = 'L'; + private static final int REQUEST = 42; + private static final Supplier REQUEST_SUPPLIER = staticSupplier(REQUEST); + private static final String CACHE_KEY = "cacheKey"; + private static final char IN_MEMORY_CACHED_ITEM = 'i'; + private static final Result PRESENT_IN_MEMORY_RESULT = present(IN_MEMORY_CACHED_ITEM); + private static final Result ABSENT_IN_MEMORY_RESULT = absent(); + private static final char DISK_CACHED_FRESH_ITEM = 'f'; + private static final Result SUCCESSFUL_DISK_CACHED_FRESH_RESULT = + success(DISK_CACHED_FRESH_ITEM); + private static final char DISK_CACHED_STALE_ITEM = 's'; + private static final Result SUCCESSFUL_DISK_CACHED_STALE_RESULT = + success(DISK_CACHED_STALE_ITEM); + private static final Result FAILED_DISK_CACHE_RESULT = failure(new IOException()); + private static final char DISK_FAILURE = 'D'; + private static final Uri NETWORK_REQUEST_URI = Uri.parse("http://then.check"); + private static final char NETWORK_RESPONSE = 'r'; + private static final Result SUCCESSFUL_RESPONSE_RESULT = success(NETWORK_RESPONSE); + private static final Result FAILED_RESPONSE_RESULT = failure(new SocketException()); + private static final Result INVALID_RESPONSE_RESULT = failure(new RuntimeException()); + private static final char NETWORK_FAILURE = 'N'; + + private static final Predicate> WAS_PRESENT = + new Predicate>() { + @Override + public boolean apply(@NonNull final Result value) { + return value.isPresent(); + } + }; + private static final Function, Character> THE_SUCCESSFUL_DISK_CACHE_RESULT = + new Function, Character>() { + @NonNull + @Override + public Character apply(@NonNull final Result input) { + return input.orElse(DISK_FAILURE); + } + }; + private static final Predicate WAS_FRESH = equalTo(DISK_CACHED_FRESH_ITEM); + + private Repository repository; + private Repository> resultRepository; + private MockUpdatable updatable; + private SingleSlotDelayedExecutor diskIoExecutor; + private SingleSlotDelayedExecutor networkIoExecutor; + @Mock + private Function mockRequestToCacheKeyFunction; + @Mock + private Function> mockInMemoryCache; + @Mock + private Function> mockDiskCache; + @Mock + private Function mockRequestToNetworkRequestFunction; + @Mock + private Function> mockNetworkRequestFunction; + @Mock + private Predicate mockNetworkResponseValid; + + @Before + public void setUp() { + initMocks(this); + when(mockRequestToCacheKeyFunction.apply(REQUEST)).thenReturn(CACHE_KEY); + when(mockRequestToNetworkRequestFunction.apply(REQUEST)).thenReturn(NETWORK_REQUEST_URI); + + updatable = mockUpdatable(); + diskIoExecutor = new SingleSlotDelayedExecutor(); + networkIoExecutor = new SingleSlotDelayedExecutor(); + final MutableRepository currentRequestVariable = mutableRepository(0); + + // This repository can end with DISK_FAILURE and can skip updating the result if the network + // response is invalid. Tested combinations: + // - thenAttempt*().or*().thenCheck() + // - thenCheck().orContinue() + // - thenCheck().orSkip() + repository = repositoryWithInitialValue(LOADING) + .observe() + .onUpdatesPerLoop() + .getFrom(REQUEST_SUPPLIER) + .sendTo(currentRequestVariable) + .transform(mockRequestToCacheKeyFunction) + .thenAttemptTransform(mockInMemoryCache).orContinue() + .goTo(diskIoExecutor) + .getFrom(currentRequestVariable) + .transform(mockRequestToCacheKeyFunction) + .thenAttemptTransform(mockDiskCache).orEnd(staticFunction(DISK_FAILURE)) + .thenCheck(WAS_FRESH).orContinue() + .goTo(networkIoExecutor) + .getFrom(currentRequestVariable) + .transform(mockRequestToNetworkRequestFunction) + .thenAttemptTransform(mockNetworkRequestFunction).orEnd(staticFunction(NETWORK_FAILURE)) + .thenCheck(mockNetworkResponseValid).orSkip() + .compile(); + + // This repository will not end with disk failure, but can expose INVALID_RESPONSE_RESULT if + // the network response is invalid. Tested combinations: + // - then*().thenCheck() + // - thenCheck().orContinue() + // - thenCheck().orEnd() + resultRepository = repositoryWithInitialValue(Result.absent()) + .observe() + .onUpdatesPerLoop() + .getFrom(REQUEST_SUPPLIER) + .sendTo(currentRequestVariable) + .transform(mockRequestToCacheKeyFunction) + .thenTransform(mockInMemoryCache) + .thenCheck(WAS_PRESENT).orContinue() + .goTo(diskIoExecutor) + .getFrom(currentRequestVariable) + .transform(mockRequestToCacheKeyFunction) + .thenTransform(mockDiskCache) + .thenCheck(THE_SUCCESSFUL_DISK_CACHE_RESULT, WAS_FRESH).orContinue() + .goTo(networkIoExecutor) + .getFrom(currentRequestVariable) + .transform(mockRequestToNetworkRequestFunction) + .thenTransform(mockNetworkRequestFunction) + .thenCheck(mockNetworkResponseValid).orEnd(staticFunction(INVALID_RESPONSE_RESULT)) + .compile(); + } + + @After + public void tearDown() { + updatable.removeFromObservables(); + } + + @Test + public void shouldUseInMemoryCachedItemIfPresent() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(PRESENT_IN_MEMORY_RESULT); + + updatable.addToObservable(repository); + + assertThat(repository, has(IN_MEMORY_CACHED_ITEM)); + } + + @Test + public void shouldUseInMemoryCachedResultIfPresent() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(PRESENT_IN_MEMORY_RESULT); + + updatable.addToObservable(resultRepository); + + assertThat(resultRepository, has(PRESENT_IN_MEMORY_RESULT)); + } + + @Test + public void shouldUseDiskCachedItemIfFresh() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_FRESH_RESULT); + + updatable.addToObservable(repository); + diskIoExecutor.resumeOrThrow(); + + assertThat(repository, has(DISK_CACHED_FRESH_ITEM)); + } + + @Test + public void shouldUseDiskCachedResultIfFresh() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_FRESH_RESULT); + + updatable.addToObservable(resultRepository); + diskIoExecutor.resumeOrThrow(); + + assertThat(resultRepository, has(SUCCESSFUL_DISK_CACHED_FRESH_RESULT)); + } + + @Test + public void shouldReportDiskFailureItemIfDiskCacheFails() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(FAILED_DISK_CACHE_RESULT); + + updatable.addToObservable(repository); + diskIoExecutor.resumeOrThrow(); + + assertThat(repository, has(DISK_FAILURE)); + } + + @Test + public void shouldUseNetworkResponseItemIfCacheIsStale() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_STALE_RESULT); + when(mockNetworkRequestFunction.apply(NETWORK_REQUEST_URI)) + .thenReturn(SUCCESSFUL_RESPONSE_RESULT); + when(mockNetworkResponseValid.apply(NETWORK_RESPONSE)).thenReturn(true); + + updatable.addToObservable(repository); + diskIoExecutor.resumeOrThrow(); + networkIoExecutor.resumeOrThrow(); + + assertThat(repository, has(NETWORK_RESPONSE)); + } + + @Test + public void shouldUseNetworkResponseResultIfCacheIsStale() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_STALE_RESULT); + when(mockNetworkRequestFunction.apply(NETWORK_REQUEST_URI)) + .thenReturn(SUCCESSFUL_RESPONSE_RESULT); + when(mockNetworkResponseValid.apply(SUCCESSFUL_RESPONSE_RESULT)).thenReturn(true); + + updatable.addToObservable(resultRepository); + diskIoExecutor.resumeOrThrow(); + networkIoExecutor.resumeOrThrow(); + + assertThat(resultRepository, has(SUCCESSFUL_RESPONSE_RESULT)); + } + + @Test + public void shouldReportNetworkFailureItemIfNetworkFails() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_STALE_RESULT); + when(mockNetworkRequestFunction.apply(NETWORK_REQUEST_URI)) + .thenReturn(FAILED_RESPONSE_RESULT); + + updatable.addToObservable(repository); + diskIoExecutor.resumeOrThrow(); + networkIoExecutor.resumeOrThrow(); + + assertThat(repository, has(NETWORK_FAILURE)); + } + + @Test + public void shouldReportNetworkFailureResultIfNetworkFails() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_STALE_RESULT); + when(mockNetworkRequestFunction.apply(NETWORK_REQUEST_URI)) + .thenReturn(FAILED_RESPONSE_RESULT); + when(mockNetworkResponseValid.apply(FAILED_RESPONSE_RESULT)).thenReturn(true); + + updatable.addToObservable(resultRepository); + diskIoExecutor.resumeOrThrow(); + networkIoExecutor.resumeOrThrow(); + + assertThat(resultRepository, has(FAILED_RESPONSE_RESULT)); + } + + @Test + public void shouldSkipUpdateIfNetworkResponseIsInvalid() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_STALE_RESULT); + when(mockNetworkRequestFunction.apply(NETWORK_REQUEST_URI)) + .thenReturn(SUCCESSFUL_RESPONSE_RESULT); + when(mockNetworkResponseValid.apply(NETWORK_RESPONSE)).thenReturn(false); + + updatable.addToObservable(repository); + diskIoExecutor.resumeOrThrow(); + networkIoExecutor.resumeOrThrow(); + + assertThat(updatable, wasNotUpdated()); + assertThat(repository, has(LOADING)); + } + + @Test + public void shouldReportInvalidResponseResultIfNetworkResponseIsInvalid() { + when(mockInMemoryCache.apply(CACHE_KEY)).thenReturn(ABSENT_IN_MEMORY_RESULT); + when(mockDiskCache.apply(CACHE_KEY)).thenReturn(SUCCESSFUL_DISK_CACHED_STALE_RESULT); + when(mockNetworkRequestFunction.apply(NETWORK_REQUEST_URI)) + .thenReturn(SUCCESSFUL_RESPONSE_RESULT); + when(mockNetworkResponseValid.apply(SUCCESSFUL_RESPONSE_RESULT)).thenReturn(false); + + updatable.addToObservable(resultRepository); + diskIoExecutor.resumeOrThrow(); + networkIoExecutor.resumeOrThrow(); + + assertThat(resultRepository, has(INVALID_RESPONSE_RESULT)); + } +}