From 188461b14d5e62259078c5de2ff75a16a26498d5 Mon Sep 17 00:00:00 2001 From: Leumor <116955025+leumor@users.noreply.github.com> Date: Thu, 26 Feb 2026 20:25:27 +0000 Subject: [PATCH 1/2] fix(client-async): Harden USK datehint resume and cancel handling --- .../crypta/client/async/USKInserter.java | 307 ++++++++++++- .../crypta/client/async/USKInserterTest.java | 421 ++++++++++++++++-- 2 files changed, 691 insertions(+), 37 deletions(-) diff --git a/src/main/java/network/crypta/client/async/USKInserter.java b/src/main/java/network/crypta/client/async/USKInserter.java index b5aa02dbe0..ebe0810e94 100644 --- a/src/main/java/network/crypta/client/async/USKInserter.java +++ b/src/main/java/network/crypta/client/async/USKInserter.java @@ -6,6 +6,7 @@ import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import network.crypta.client.InsertContext; import network.crypta.client.InsertException.InsertExceptionMode; import network.crypta.client.InsertException; @@ -125,6 +126,12 @@ public final class USKInserter /** After this many attempted slots without success, fall back to re-fetch the latest. */ private static final long MAX_TRIED_SLOTS = 10; + /** Maximum time to wait for USK datehint child inserts to make progress before retrying. */ + private static final long DATEHINT_STALL_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(15); + + /** Retry stalled USK datehint phase at most this many times before surfacing failure. */ + private static final int MAX_DATEHINT_STALL_RETRIES = 1; + /** If true, frees {@link #data} after completion or terminal failure. */ private final boolean freeData; @@ -140,6 +147,216 @@ public final class USKInserter /** Optional override for the crypto key material; {@code null} to auto-generate. */ final byte[] forceCryptoKey; + /** Active datehint insert phase; null when not currently inserting datehints. */ + private transient DateHintPhase activeDateHintPhase; + + /** Monotonic identifier for datehint phases; helps ignore stale callbacks. */ + private transient long nextDateHintPhaseId = 1; + + /** + * Runtime-only state for one active datehint insert phase. + * + *
This state is intentionally transient and rebuilt by the terminal callback's {@link + * PutCompletionCallback#onResume(ClientContext)} path after restart. + */ + private static final class DateHintPhase { + final long phaseId; + final int retryCount; + final DateHintTerminalCallback terminalCallback; + long lastProgressAtMillis; + boolean watchdogCancelIssued; + + DateHintPhase(long phaseId, int retryCount, DateHintTerminalCallback terminalCallback) { + this.phaseId = phaseId; + this.retryCount = retryCount; + this.terminalCallback = terminalCallback; + this.lastProgressAtMillis = System.currentTimeMillis(); + this.watchdogCancelIssued = false; + } + } + + /** + * Terminal callback for a datehint phase. + * + *
Delegates normal notifications to {@link #cb}, while intercepting terminal success/failure
+ * so we can retry a stalled phase once when the watchdog cancels it for lack of progress.
+ */
+ private final class DateHintTerminalCallback implements PutCompletionCallback, Serializable {
+ @Serial private static final long serialVersionUID = 1L;
+
+ private final long phaseId;
+ private final long edition;
+ private final int retryCount;
+ private MultiPutCompletionCallback group;
+ private transient volatile boolean awaitingPhaseRestore;
+ private transient volatile boolean completedBeforePhaseRestore;
+
+ DateHintTerminalCallback(long phaseId, long edition, int retryCount) {
+ this.phaseId = phaseId;
+ this.edition = edition;
+ this.retryCount = retryCount;
+ this.awaitingPhaseRestore = false;
+ }
+
+ void bindGroup(MultiPutCompletionCallback group) {
+ this.group = group;
+ }
+
+ void cancelGroup(ClientContext context) {
+ MultiPutCompletionCallback localGroup = group;
+ if (localGroup != null) localGroup.cancel(context);
+ }
+
+ private boolean isParentRequestCancelled() {
+ BaseClientPutter parentPutter = parent;
+ return parentPutter == null || parentPutter.isCancelled();
+ }
+
+ private PutCompletionCallback getCompletionCallbackOrNull(String eventName) {
+ PutCompletionCallback callback = cb;
+ if (callback == null && LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Dropping datehint {} callback for {} phase {} because completion callback is null",
+ eventName,
+ USKInserter.this,
+ phaseId);
+ }
+ return callback;
+ }
+
+ private void markDateHintProgress(long phaseId) {
+ synchronized (USKInserter.this) {
+ if (activeDateHintPhase == null || activeDateHintPhase.phaseId != phaseId) return;
+ activeDateHintPhase.lastProgressAtMillis = System.currentTimeMillis();
+ }
+ }
+
+ private void onDateHintPhaseResumed(ClientContext context) {
+ synchronized (USKInserter.this) {
+ DateHintPhase current = activeDateHintPhase;
+ if (current == null || current.phaseId != phaseId) {
+ activeDateHintPhase = new DateHintPhase(phaseId, retryCount, this);
+ nextDateHintPhaseId = Math.max(nextDateHintPhaseId, phaseId + 1);
+ } else {
+ current.lastProgressAtMillis = System.currentTimeMillis();
+ }
+ }
+ scheduleDateHintWatchdog(context, phaseId, DATEHINT_STALL_TIMEOUT_MILLIS);
+ }
+
+ private void onDateHintPhaseFinished(
+ ClientPutState completedState, InsertException failure, ClientContext context) {
+ boolean retryOnStallCancel;
+ synchronized (USKInserter.this) {
+ DateHintPhase phase = activeDateHintPhase;
+ if (phase == null) {
+ // Resume-order race: MultiPut resumes children before callback.onResume() rebuilds phase.
+ if (!awaitingPhaseRestore || completedBeforePhaseRestore) return;
+ retryOnStallCancel = false;
+ completedBeforePhaseRestore = true;
+ } else {
+ if (phase.phaseId != phaseId) return;
+ retryOnStallCancel =
+ failure != null
+ && phase.watchdogCancelIssued
+ && failure.getMode() == InsertExceptionMode.CANCELLED
+ && retryCount < MAX_DATEHINT_STALL_RETRIES
+ && !isParentRequestCancelled();
+ activeDateHintPhase = null;
+ }
+ }
+ if (retryOnStallCancel) {
+ LOG.warn(
+ "Retrying USK datehint insert phase for {} edition {} after watchdog cancel (attempt {}"
+ + " of {})",
+ USKInserter.this,
+ edition,
+ retryCount + 1,
+ MAX_DATEHINT_STALL_RETRIES);
+ startDateHintInsertPhase(context, edition, retryCount + 1, completedState);
+ return;
+ }
+ PutCompletionCallback callback =
+ getCompletionCallbackOrNull(failure != null ? "onFailure" : "onSuccess");
+ if (callback == null) return;
+ if (failure != null) callback.onFailure(failure, completedState, context);
+ else callback.onSuccess(completedState, context);
+ }
+
+ @Override
+ public void onSuccess(ClientPutState state, ClientContext context) {
+ markDateHintProgress(phaseId);
+ onDateHintPhaseFinished(state, null, context);
+ }
+
+ @Override
+ public void onFailure(InsertException e, ClientPutState state, ClientContext context) {
+ markDateHintProgress(phaseId);
+ onDateHintPhaseFinished(state, e, context);
+ }
+
+ @Override
+ public void onEncode(BaseClientKey usk, ClientPutState state, ClientContext context) {
+ markDateHintProgress(phaseId);
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onEncode");
+ if (callback != null) callback.onEncode(usk, state, context);
+ }
+
+ @Override
+ public void onTransition(
+ ClientPutState oldState, ClientPutState newState, ClientContext context) {
+ markDateHintProgress(phaseId);
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onTransition");
+ if (callback != null) callback.onTransition(oldState, newState, context);
+ }
+
+ @Override
+ public void onMetadata(Metadata m, ClientPutState state, ClientContext context) {
+ markDateHintProgress(phaseId);
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onMetadataMetadata");
+ if (callback != null) callback.onMetadata(m, state, context);
+ }
+
+ @Override
+ public void onMetadata(Bucket meta, ClientPutState state, ClientContext context) {
+ markDateHintProgress(phaseId);
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onMetadataBucket");
+ if (callback != null) callback.onMetadata(meta, state, context);
+ }
+
+ @Override
+ public void onFetchable(ClientPutState state) {
+ markDateHintProgress(phaseId);
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onFetchable");
+ if (callback != null) callback.onFetchable(state);
+ }
+
+ @Override
+ public void onBlockSetFinished(ClientPutState state, ClientContext context) {
+ markDateHintProgress(phaseId);
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onBlockSetFinished");
+ if (callback != null) callback.onBlockSetFinished(state, context);
+ }
+
+ @Override
+ public void onResume(ClientContext context) throws InsertException, ResumeFailedException {
+ if (!completedBeforePhaseRestore) {
+ onDateHintPhaseResumed(context);
+ }
+ awaitingPhaseRestore = false;
+ PutCompletionCallback callback = getCompletionCallbackOrNull("onResume");
+ if (callback != null && callback != parent) callback.onResume(context);
+ }
+
+ @Serial
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ awaitingPhaseRestore = true;
+ completedBeforePhaseRestore = false;
+ }
+ }
+
/**
* Starts the asynchronous USK insert process.
*
@@ -264,11 +481,26 @@ private void insertSucceeded(ClientContext context, long edition) {
cb.onSuccess(this, context);
return;
}
+ startDateHintInsertPhase(context, edition, 0, this);
+ }
+
+ private void startDateHintInsertPhase(
+ ClientContext context, long edition, int retryCount, ClientPutState transitionFrom) {
if (LOG.isDebugEnabled())
- LOG.debug("Inserted to edition {} - inserting USK date hints...", edition);
+ LOG.debug(
+ "Inserted to edition {} - inserting USK date hints (retry {} of {})...",
+ edition,
+ retryCount,
+ MAX_DATEHINT_STALL_RETRIES);
USKDateHint hint = USKDateHint.now();
+ long phaseId = reserveDateHintPhaseId();
+ DateHintTerminalCallback terminalCallback =
+ new DateHintTerminalCallback(phaseId, edition, retryCount);
MultiPutCompletionCallback m =
- new MultiPutCompletionCallback(cb, parent, tokenObject, persistent, true);
+ new MultiPutCompletionCallback(terminalCallback, parent, tokenObject, persistent, true);
+ terminalCallback.bindGroup(m);
+ activateDateHintPhase(new DateHintPhase(phaseId, retryCount, terminalCallback));
+
byte[] hintData = hint.getData(edition).getBytes(StandardCharsets.UTF_8);
FreenetURI[] hintURIs = hint.getInsertURIs(privUSK);
boolean added = false;
@@ -290,6 +522,7 @@ private void insertSucceeded(ClientContext context, long edition) {
} catch (IOException e) {
LOG.error("Unable to insert USK date hints due to disk I/O error: {}", e, e);
if (!added) {
+ clearActiveDateHintPhase(phaseId);
cb.onFailure(
new InsertException(
InsertExceptionMode.BUCKET_ERROR, e, pubUSK.getSSK(edition).getURI()),
@@ -300,13 +533,74 @@ private void insertSucceeded(ClientContext context, long edition) {
} catch (InsertException e) {
LOG.error("Unable to insert USK date hints due to error: {}", e, e);
if (!added) {
+ clearActiveDateHintPhase(phaseId);
cb.onFailure(e, this, context);
return;
} // Else try to insert the other hints.
}
}
- cb.onTransition(this, m, context);
+ if (!added) {
+ clearActiveDateHintPhase(phaseId);
+ cb.onSuccess(this, context);
+ return;
+ }
+ cb.onTransition(transitionFrom, m, context);
m.arm(context);
+ scheduleDateHintWatchdog(context, phaseId, DATEHINT_STALL_TIMEOUT_MILLIS);
+ }
+
+ private synchronized long reserveDateHintPhaseId() {
+ return nextDateHintPhaseId++;
+ }
+
+ private synchronized void activateDateHintPhase(DateHintPhase phase) {
+ activeDateHintPhase = phase;
+ nextDateHintPhaseId = Math.max(nextDateHintPhaseId, phase.phaseId + 1);
+ }
+
+ private synchronized void clearActiveDateHintPhase(long phaseId) {
+ if (activeDateHintPhase != null && activeDateHintPhase.phaseId == phaseId) {
+ activeDateHintPhase = null;
+ }
+ }
+
+ private void scheduleDateHintWatchdog(ClientContext context, long phaseId, long delayMillis) {
+ if (context.ticker == null) return;
+ long boundedDelay = Math.max(1L, delayMillis);
+ context.ticker.queueTimedJob(() -> runDateHintWatchdog(context, phaseId), boundedDelay);
+ }
+
+ private void runDateHintWatchdog(ClientContext context, long phaseId) {
+ DateHintTerminalCallback callbackToCancel = null;
+ long rescheduleDelay = -1L;
+ long stalledFor;
+ int retryCount = 0;
+ synchronized (this) {
+ DateHintPhase phase = activeDateHintPhase;
+ if (phase == null || phase.phaseId != phaseId) return;
+ long now = System.currentTimeMillis();
+ stalledFor = now - phase.lastProgressAtMillis;
+ if (stalledFor < DATEHINT_STALL_TIMEOUT_MILLIS) {
+ rescheduleDelay = DATEHINT_STALL_TIMEOUT_MILLIS - stalledFor;
+ } else if (!phase.watchdogCancelIssued) {
+ phase.watchdogCancelIssued = true;
+ callbackToCancel = phase.terminalCallback;
+ retryCount = phase.retryCount;
+ }
+ }
+ if (rescheduleDelay > 0L) {
+ scheduleDateHintWatchdog(context, phaseId, rescheduleDelay);
+ return;
+ }
+ if (callbackToCancel == null) return;
+ LOG.warn(
+ "USK datehint insert phase {} stalled for {} ms on {} (retry {} of {}) - cancelling phase",
+ phaseId,
+ stalledFor,
+ this,
+ retryCount,
+ MAX_DATEHINT_STALL_RETRIES);
+ callbackToCancel.cancelGroup(context);
}
private void scheduleInsert(ClientContext context) {
@@ -530,6 +824,7 @@ public BaseClientPutter getParent() {
public void cancel(ClientContext context) {
USKFetcherTag tag;
SingleBlockInserter localSbi;
+ DateHintTerminalCallback localDateHintCallback;
synchronized (this) {
if (finished) return;
finished = true;
@@ -537,6 +832,9 @@ public void cancel(ClientContext context) {
fetcher = null;
localSbi = sbi;
sbi = null;
+ localDateHintCallback =
+ (activeDateHintPhase == null) ? null : activeDateHintPhase.terminalCallback;
+ activeDateHintPhase = null;
}
if (tag != null) {
tag.cancel(context);
@@ -544,6 +842,9 @@ public void cancel(ClientContext context) {
if (localSbi != null) {
localSbi.cancel(context); // will call onFailure, which will removeFrom()
}
+ if (localDateHintCallback != null) {
+ localDateHintCallback.cancelGroup(context);
+ }
if (freeData) {
Bucket dataToFree;
synchronized (this) {
diff --git a/src/test/java/network/crypta/client/async/USKInserterTest.java b/src/test/java/network/crypta/client/async/USKInserterTest.java
index b82507f63d..9d339643ec 100644
--- a/src/test/java/network/crypta/client/async/USKInserterTest.java
+++ b/src/test/java/network/crypta/client/async/USKInserterTest.java
@@ -5,6 +5,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Serial;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import network.crypta.client.FetchContext;
@@ -21,6 +24,7 @@
import network.crypta.keys.USK;
import network.crypta.node.ClientContextResources;
import network.crypta.support.PriorityAwareExecutor;
+import network.crypta.support.Ticker;
import network.crypta.support.api.Bucket;
import org.jspecify.annotations.NonNull;
import org.junit.jupiter.api.BeforeEach;
@@ -33,10 +37,13 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -133,38 +140,7 @@ public void close() {
@BeforeEach
void setup() {
- // Minimal ClientContext: only fields touched by these tests are used.
- // Many collaborators are irrelevant here and can be passed as null or simple stubs.
- context =
- new ClientContext(
- 1L,
- new ClientContextRuntime(
- Mockito.mock(ClientLayerPersister.class),
- new InlineExecutor(),
- null,
- null,
- new DummyRandomSource(),
- new SecureRandom(),
- null),
- new ClientContextStorageFactories(null, null, null, null, null, null, null),
- new ClientContextRafFactories(null, null),
- new ClientContextServices(
- new ClientContextResources(null, null), uskManager, null, null, null, null),
- new ClientContextDefaults(
- // Default persistent contexts used only when building fetchers internally; not
- // exercised in these tests.
- new FetchContext(
- FetchContextOptions.builder()
- .limits(0, 0, 0)
- .archiveLimits(1, 0, 0, true)
- .retryLimits(0, 0, 0)
- .splitfileLimits(false, 0, 0)
- .behavior(false, false, false)
- .clientOptions(new SimpleEventProducer(), false, false)
- .filterOverrides(null, null, null)
- .build()),
- newInsertContext(),
- null));
+ context = newContext(null);
}
// Helpers in tests create per-test SSKs to build consistent public/insertable USK URIs.
@@ -253,6 +229,40 @@ public String toString() {
};
}
+ private ClientContext newContext(Ticker ticker) {
+ // Minimal ClientContext: only fields touched by these tests are used.
+ // Many collaborators are irrelevant here and can be passed as null or simple stubs.
+ return new ClientContext(
+ 1L,
+ new ClientContextRuntime(
+ Mockito.mock(ClientLayerPersister.class),
+ new InlineExecutor(),
+ null,
+ ticker,
+ new DummyRandomSource(),
+ new SecureRandom(),
+ null),
+ new ClientContextStorageFactories(null, null, null, null, null, null, null),
+ new ClientContextRafFactories(null, null),
+ new ClientContextServices(
+ new ClientContextResources(null, null), uskManager, null, null, null, null),
+ new ClientContextDefaults(
+ // Default persistent contexts used only when building fetchers internally; not
+ // exercised in these tests.
+ new FetchContext(
+ FetchContextOptions.builder()
+ .limits(0, 0, 0)
+ .archiveLimits(1, 0, 0, true)
+ .retryLimits(0, 0, 0)
+ .splitfileLimits(false, 0, 0)
+ .behavior(false, false, false)
+ .clientOptions(new SimpleEventProducer(), false, false)
+ .filterOverrides(null, null, null)
+ .build()),
+ newInsertContext(),
+ null));
+ }
+
private USKInserter newInserter(
Bucket data,
short compressionCodec,
@@ -279,6 +289,71 @@ private USKInserter newInserter(
new BlockInsertOptions(cfg.persistent, cfg.realTimeFlag, cfg.freeData, 0));
}
+ private static Object newDateHintTerminalCallback(
+ USKInserter inserter, long phaseId, long edition) throws Exception {
+ Class> callbackClass =
+ Class.forName(USKInserter.class.getName() + "$DateHintTerminalCallback");
+ Constructor> ctor =
+ callbackClass.getDeclaredConstructor(USKInserter.class, long.class, long.class, int.class);
+ ctor.setAccessible(true);
+ return ctor.newInstance(inserter, phaseId, edition, 0);
+ }
+
+ private static Object newDateHintPhase(long phaseId, Object callback) throws Exception {
+ Class> callbackClass =
+ Class.forName(USKInserter.class.getName() + "$DateHintTerminalCallback");
+ Class> phaseClass = Class.forName(USKInserter.class.getName() + "$DateHintPhase");
+ Constructor> ctor = phaseClass.getDeclaredConstructor(long.class, int.class, callbackClass);
+ ctor.setAccessible(true);
+ return ctor.newInstance(phaseId, 0, callback);
+ }
+
+ private static void setDateHintCallbackGroup(Object callback, MultiPutCompletionCallback group)
+ throws Exception {
+ Field f = callback.getClass().getDeclaredField("group");
+ f.setAccessible(true);
+ f.set(callback, group);
+ }
+
+ private static void setDateHintLastProgressAtMillis(Object phase, long value) throws Exception {
+ Field f = phase.getClass().getDeclaredField("lastProgressAtMillis");
+ f.setAccessible(true);
+ f.setLong(phase, value);
+ }
+
+ private static boolean isDateHintWatchdogCancelIssued(Object phase) throws Exception {
+ Field f = phase.getClass().getDeclaredField("watchdogCancelIssued");
+ f.setAccessible(true);
+ return f.getBoolean(phase);
+ }
+
+ private static void setDateHintWatchdogCancelIssued(Object phase) throws Exception {
+ Field f = phase.getClass().getDeclaredField("watchdogCancelIssued");
+ f.setAccessible(true);
+ f.setBoolean(phase, true);
+ }
+
+ private static void setActiveDateHintPhase(USKInserter inserter, Object phase) throws Exception {
+ Field activeField = USKInserter.class.getDeclaredField("activeDateHintPhase");
+ activeField.setAccessible(true);
+ activeField.set(inserter, phase);
+ }
+
+ private static void setDateHintTerminalAwaitingPhaseRestore(
+ PutCompletionCallback callback, boolean inProgress) throws Exception {
+ Field restoreField = callback.getClass().getDeclaredField("awaitingPhaseRestore");
+ restoreField.setAccessible(true);
+ restoreField.setBoolean(callback, inProgress);
+ }
+
+ private static void runDateHintWatchdog(USKInserter inserter, ClientContext context, long phaseId)
+ throws Exception {
+ Method watchdog =
+ USKInserter.class.getDeclaredMethod("runDateHintWatchdog", ClientContext.class, long.class);
+ watchdog.setAccessible(true);
+ watchdog.invoke(inserter, context, phaseId);
+ }
+
@Test
void schedule_whenCalled_schedulesFetcher() throws Exception {
// Arrange
@@ -516,6 +591,286 @@ void cancel_whenActive_cancelsFetcher_freesData_andEmitsCancelledFailure() throw
assertEquals(InsertExceptionMode.CANCELLED, ex.getValue().getMode());
}
+ @Test
+ void runDateHintWatchdog_whenPhaseStalled_cancelsActiveDateHintGroup() throws Exception {
+ // Arrange
+ byte[] bytes = "stalled-datehint".getBytes(StandardCharsets.UTF_8);
+ Bucket data = makeBucket(bytes);
+ String site = "watchdog-stall";
+ long edition = 5L;
+ long phaseId = 91L;
+ InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site);
+ FreenetURI insertUri =
+ new FreenetURI(
+ "USK",
+ site,
+ null,
+ ssk.getInsertURI().getRoutingKey(),
+ ssk.getInsertURI().getCryptoKey(),
+ ssk.getInsertURI().getExtra(),
+ edition);
+ InsertContext ic = newInsertContext();
+
+ USKInserter inserter =
+ newInserter(
+ data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false));
+
+ MultiPutCompletionCallback group = Mockito.mock(MultiPutCompletionCallback.class);
+ Object callback = newDateHintTerminalCallback(inserter, phaseId, edition);
+ setDateHintCallbackGroup(callback, group);
+ Object phase = newDateHintPhase(phaseId, callback);
+ setDateHintLastProgressAtMillis(phase, 0L);
+ setActiveDateHintPhase(inserter, phase);
+
+ // Act
+ runDateHintWatchdog(inserter, context, phaseId);
+
+ // Assert
+ verify(group, times(1)).cancel(context);
+ assertTrue(isDateHintWatchdogCancelIssued(phase));
+ }
+
+ @Test
+ void runDateHintWatchdog_whenRecentProgress_reschedulesWithoutCancelling() throws Exception {
+ // Arrange
+ Ticker ticker = Mockito.mock(Ticker.class);
+ ClientContext contextWithTicker = newContext(ticker);
+ byte[] bytes = "recent-datehint".getBytes(StandardCharsets.UTF_8);
+ Bucket data = makeBucket(bytes);
+ String site = "watchdog-reschedule";
+ long edition = 9L;
+ long phaseId = 92L;
+ InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site);
+ FreenetURI insertUri =
+ new FreenetURI(
+ "USK",
+ site,
+ null,
+ ssk.getInsertURI().getRoutingKey(),
+ ssk.getInsertURI().getCryptoKey(),
+ ssk.getInsertURI().getExtra(),
+ edition);
+ InsertContext ic = newInsertContext();
+
+ USKInserter inserter =
+ newInserter(
+ data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false));
+
+ MultiPutCompletionCallback group = Mockito.mock(MultiPutCompletionCallback.class);
+ Object callback = newDateHintTerminalCallback(inserter, phaseId, edition);
+ setDateHintCallbackGroup(callback, group);
+ Object phase = newDateHintPhase(phaseId, callback);
+ setDateHintLastProgressAtMillis(phase, System.currentTimeMillis());
+ setActiveDateHintPhase(inserter, phase);
+
+ // Act
+ runDateHintWatchdog(inserter, contextWithTicker, phaseId);
+
+ // Assert
+ verify(group, never()).cancel(any(ClientContext.class));
+ verify(ticker, times(1))
+ .queueTimedJob(any(Runnable.class), Mockito.longThat(delay -> delay > 0L));
+ assertFalse(isDateHintWatchdogCancelIssued(phase));
+ }
+
+ @Test
+ void dateHintTerminalCallback_onSuccess_forwardsTransitionedStateToParentCallback()
+ throws Exception {
+ // Arrange
+ byte[] bytes = "datehint-success".getBytes(StandardCharsets.UTF_8);
+ Bucket data = makeBucket(bytes);
+ String site = "datehint-success-site";
+ long edition = 4L;
+ long phaseId = 301L;
+ InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site);
+ FreenetURI insertUri =
+ new FreenetURI(
+ "USK",
+ site,
+ null,
+ ssk.getInsertURI().getRoutingKey(),
+ ssk.getInsertURI().getCryptoKey(),
+ ssk.getInsertURI().getExtra(),
+ edition);
+ InsertContext ic = newInsertContext();
+ USKInserter inserter =
+ newInserter(
+ data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false));
+
+ PutCompletionCallback terminalCallback =
+ (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition);
+ Object phase = newDateHintPhase(phaseId, terminalCallback);
+ setActiveDateHintPhase(inserter, phase);
+
+ ClientPutState transitionedState = Mockito.mock(ClientPutState.class);
+
+ // Act
+ terminalCallback.onSuccess(transitionedState, context);
+
+ // Assert
+ ArgumentCaptor