diff --git a/src/main/java/network/crypta/client/async/USKInserter.java b/src/main/java/network/crypta/client/async/USKInserter.java index b5aa02dbe0..13ce9ba6c8 100644 --- a/src/main/java/network/crypta/client/async/USKInserter.java +++ b/src/main/java/network/crypta/client/async/USKInserter.java @@ -6,6 +6,7 @@ import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import network.crypta.client.InsertContext; import network.crypta.client.InsertException.InsertExceptionMode; import network.crypta.client.InsertException; @@ -125,6 +126,12 @@ public final class USKInserter /** After this many attempted slots without success, fall back to re-fetch the latest. */ private static final long MAX_TRIED_SLOTS = 10; + /** Maximum time to wait for USK datehint child inserts to make progress before retrying. */ + private static final long DATEHINT_STALL_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(15); + + /** Retry stalled USK datehint phase at most this many times before surfacing failure. */ + private static final int MAX_DATEHINT_STALL_RETRIES = 1; + /** If true, frees {@link #data} after completion or terminal failure. */ private final boolean freeData; @@ -140,6 +147,222 @@ public final class USKInserter /** Optional override for the crypto key material; {@code null} to auto-generate. */ final byte[] forceCryptoKey; + /** Active datehint insert phase; null when not currently inserting datehints. */ + private transient DateHintPhase activeDateHintPhase; + + /** Monotonic identifier for datehint phases; helps ignore stale callbacks. */ + private transient long nextDateHintPhaseId = 1; + + /** + * Runtime-only state for one active datehint insert phase. + * + *

This state is intentionally transient and rebuilt by the terminal callback's {@link + * PutCompletionCallback#onResume(ClientContext)} path after restart. + */ + private static final class DateHintPhase { + final long phaseId; + final int retryCount; + final DateHintTerminalCallback terminalCallback; + long lastProgressAtMillis; + boolean watchdogCancelIssued; + + DateHintPhase(long phaseId, int retryCount, DateHintTerminalCallback terminalCallback) { + this.phaseId = phaseId; + this.retryCount = retryCount; + this.terminalCallback = terminalCallback; + this.lastProgressAtMillis = System.currentTimeMillis(); + this.watchdogCancelIssued = false; + } + } + + /** + * Terminal callback for a datehint phase. + * + *

Delegates normal notifications to {@link #cb}, while intercepting terminal success/failure + * so we can retry a stalled phase once when the watchdog cancels it for lack of progress. + */ + private final class DateHintTerminalCallback implements PutCompletionCallback, Serializable { + @Serial private static final long serialVersionUID = 1L; + + private final long phaseId; + private final long edition; + private final int retryCount; + private MultiPutCompletionCallback group; + private volatile boolean watchdogCancelIssued; + private transient volatile boolean awaitingPhaseRestore; + private transient volatile boolean completedBeforePhaseRestore; + + DateHintTerminalCallback(long phaseId, long edition, int retryCount) { + this.phaseId = phaseId; + this.edition = edition; + this.retryCount = retryCount; + this.watchdogCancelIssued = false; + this.awaitingPhaseRestore = false; + } + + void bindGroup(MultiPutCompletionCallback group) { + this.group = group; + } + + void cancelGroup(ClientContext context) { + MultiPutCompletionCallback localGroup = group; + if (localGroup != null) localGroup.cancel(context); + } + + private boolean isParentRequestCancelled() { + BaseClientPutter parentPutter = parent; + return parentPutter == null || parentPutter.isCancelled(); + } + + private PutCompletionCallback getCompletionCallbackOrNull(String eventName) { + PutCompletionCallback callback = cb; + if (callback == null && LOG.isDebugEnabled()) { + LOG.debug( + "Dropping datehint {} callback for {} phase {} because completion callback is null", + eventName, + USKInserter.this, + phaseId); + } + return callback; + } + + private void markDateHintProgress(long phaseId) { + synchronized (USKInserter.this) { + if (activeDateHintPhase == null || activeDateHintPhase.phaseId != phaseId) return; + activeDateHintPhase.lastProgressAtMillis = System.currentTimeMillis(); + } + } + + private void onDateHintPhaseResumed(ClientContext context) { + synchronized (USKInserter.this) { + DateHintPhase current = activeDateHintPhase; + if (current == null || current.phaseId != phaseId) { + DateHintPhase restored = new DateHintPhase(phaseId, retryCount, this); + restored.watchdogCancelIssued = watchdogCancelIssued; + activeDateHintPhase = restored; + nextDateHintPhaseId = Math.max(nextDateHintPhaseId, phaseId + 1); + } else { + current.watchdogCancelIssued |= watchdogCancelIssued; + current.lastProgressAtMillis = System.currentTimeMillis(); + } + } + scheduleDateHintWatchdog(context, phaseId, DATEHINT_STALL_TIMEOUT_MILLIS); + } + + private void onDateHintPhaseFinished( + ClientPutState completedState, InsertException failure, ClientContext context) { + boolean retryOnStallCancel; + synchronized (USKInserter.this) { + DateHintPhase phase = activeDateHintPhase; + if (phase == null) { + // Resume-order race: MultiPut resumes children before callback.onResume() rebuilds phase. + if (!awaitingPhaseRestore || completedBeforePhaseRestore) return; + retryOnStallCancel = false; + completedBeforePhaseRestore = true; + } else { + if (phase.phaseId != phaseId) return; + boolean watchdogCancelled = phase.watchdogCancelIssued || watchdogCancelIssued; + retryOnStallCancel = + failure != null + && watchdogCancelled + && failure.getMode() == InsertExceptionMode.CANCELLED + && retryCount < MAX_DATEHINT_STALL_RETRIES + && !isParentRequestCancelled(); + activeDateHintPhase = null; + } + } + if (retryOnStallCancel) { + LOG.warn( + "Retrying USK datehint insert phase for {} edition {} after watchdog cancel (attempt {}" + + " of {})", + USKInserter.this, + edition, + retryCount + 1, + MAX_DATEHINT_STALL_RETRIES); + startDateHintInsertPhase(context, edition, retryCount + 1, completedState); + return; + } + PutCompletionCallback callback = + getCompletionCallbackOrNull(failure != null ? "onFailure" : "onSuccess"); + if (callback == null) return; + if (failure != null) callback.onFailure(failure, completedState, context); + else callback.onSuccess(completedState, context); + } + + @Override + public void onSuccess(ClientPutState state, ClientContext context) { + markDateHintProgress(phaseId); + onDateHintPhaseFinished(state, null, context); + } + + @Override + public void onFailure(InsertException e, ClientPutState state, ClientContext context) { + markDateHintProgress(phaseId); + onDateHintPhaseFinished(state, e, context); + } + + @Override + public void onEncode(BaseClientKey usk, ClientPutState state, ClientContext context) { + markDateHintProgress(phaseId); + PutCompletionCallback callback = getCompletionCallbackOrNull("onEncode"); + if (callback != null) callback.onEncode(usk, state, context); + } + + @Override + public void onTransition( + ClientPutState oldState, ClientPutState newState, ClientContext context) { + markDateHintProgress(phaseId); + PutCompletionCallback callback = getCompletionCallbackOrNull("onTransition"); + if (callback != null) callback.onTransition(oldState, newState, context); + } + + @Override + public void onMetadata(Metadata m, ClientPutState state, ClientContext context) { + markDateHintProgress(phaseId); + PutCompletionCallback callback = getCompletionCallbackOrNull("onMetadataMetadata"); + if (callback != null) callback.onMetadata(m, state, context); + } + + @Override + public void onMetadata(Bucket meta, ClientPutState state, ClientContext context) { + markDateHintProgress(phaseId); + PutCompletionCallback callback = getCompletionCallbackOrNull("onMetadataBucket"); + if (callback != null) callback.onMetadata(meta, state, context); + } + + @Override + public void onFetchable(ClientPutState state) { + markDateHintProgress(phaseId); + PutCompletionCallback callback = getCompletionCallbackOrNull("onFetchable"); + if (callback != null) callback.onFetchable(state); + } + + @Override + public void onBlockSetFinished(ClientPutState state, ClientContext context) { + markDateHintProgress(phaseId); + PutCompletionCallback callback = getCompletionCallbackOrNull("onBlockSetFinished"); + if (callback != null) callback.onBlockSetFinished(state, context); + } + + @Override + public void onResume(ClientContext context) throws InsertException, ResumeFailedException { + if (!completedBeforePhaseRestore) { + onDateHintPhaseResumed(context); + } + awaitingPhaseRestore = false; + PutCompletionCallback callback = getCompletionCallbackOrNull("onResume"); + if (callback != null && callback != parent) callback.onResume(context); + } + + @Serial + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + awaitingPhaseRestore = true; + completedBeforePhaseRestore = false; + } + } + /** * Starts the asynchronous USK insert process. * @@ -264,11 +487,26 @@ private void insertSucceeded(ClientContext context, long edition) { cb.onSuccess(this, context); return; } + startDateHintInsertPhase(context, edition, 0, this); + } + + private void startDateHintInsertPhase( + ClientContext context, long edition, int retryCount, ClientPutState transitionFrom) { if (LOG.isDebugEnabled()) - LOG.debug("Inserted to edition {} - inserting USK date hints...", edition); + LOG.debug( + "Inserted to edition {} - inserting USK date hints (retry {} of {})...", + edition, + retryCount, + MAX_DATEHINT_STALL_RETRIES); USKDateHint hint = USKDateHint.now(); + long phaseId = reserveDateHintPhaseId(); + DateHintTerminalCallback terminalCallback = + new DateHintTerminalCallback(phaseId, edition, retryCount); MultiPutCompletionCallback m = - new MultiPutCompletionCallback(cb, parent, tokenObject, persistent, true); + new MultiPutCompletionCallback(terminalCallback, parent, tokenObject, persistent, true); + terminalCallback.bindGroup(m); + activateDateHintPhase(new DateHintPhase(phaseId, retryCount, terminalCallback)); + byte[] hintData = hint.getData(edition).getBytes(StandardCharsets.UTF_8); FreenetURI[] hintURIs = hint.getInsertURIs(privUSK); boolean added = false; @@ -290,6 +528,7 @@ private void insertSucceeded(ClientContext context, long edition) { } catch (IOException e) { LOG.error("Unable to insert USK date hints due to disk I/O error: {}", e, e); if (!added) { + clearActiveDateHintPhase(phaseId); cb.onFailure( new InsertException( InsertExceptionMode.BUCKET_ERROR, e, pubUSK.getSSK(edition).getURI()), @@ -300,13 +539,75 @@ private void insertSucceeded(ClientContext context, long edition) { } catch (InsertException e) { LOG.error("Unable to insert USK date hints due to error: {}", e, e); if (!added) { + clearActiveDateHintPhase(phaseId); cb.onFailure(e, this, context); return; } // Else try to insert the other hints. } } - cb.onTransition(this, m, context); + if (!added) { + clearActiveDateHintPhase(phaseId); + cb.onSuccess(this, context); + return; + } + cb.onTransition(transitionFrom, m, context); m.arm(context); + scheduleDateHintWatchdog(context, phaseId, DATEHINT_STALL_TIMEOUT_MILLIS); + } + + private synchronized long reserveDateHintPhaseId() { + return nextDateHintPhaseId++; + } + + private synchronized void activateDateHintPhase(DateHintPhase phase) { + activeDateHintPhase = phase; + nextDateHintPhaseId = Math.max(nextDateHintPhaseId, phase.phaseId + 1); + } + + private synchronized void clearActiveDateHintPhase(long phaseId) { + if (activeDateHintPhase != null && activeDateHintPhase.phaseId == phaseId) { + activeDateHintPhase = null; + } + } + + private void scheduleDateHintWatchdog(ClientContext context, long phaseId, long delayMillis) { + if (context.ticker == null) return; + long boundedDelay = Math.max(1L, delayMillis); + context.ticker.queueTimedJob(() -> runDateHintWatchdog(context, phaseId), boundedDelay); + } + + private void runDateHintWatchdog(ClientContext context, long phaseId) { + DateHintTerminalCallback callbackToCancel = null; + long rescheduleDelay = -1L; + long stalledFor; + int retryCount = 0; + synchronized (this) { + DateHintPhase phase = activeDateHintPhase; + if (phase == null || phase.phaseId != phaseId) return; + long now = System.currentTimeMillis(); + stalledFor = now - phase.lastProgressAtMillis; + if (stalledFor < DATEHINT_STALL_TIMEOUT_MILLIS) { + rescheduleDelay = DATEHINT_STALL_TIMEOUT_MILLIS - stalledFor; + } else if (!phase.watchdogCancelIssued) { + phase.watchdogCancelIssued = true; + phase.terminalCallback.watchdogCancelIssued = true; + callbackToCancel = phase.terminalCallback; + retryCount = phase.retryCount; + } + } + if (rescheduleDelay > 0L) { + scheduleDateHintWatchdog(context, phaseId, rescheduleDelay); + return; + } + if (callbackToCancel == null) return; + LOG.warn( + "USK datehint insert phase {} stalled for {} ms on {} (retry {} of {}) - cancelling phase", + phaseId, + stalledFor, + this, + retryCount, + MAX_DATEHINT_STALL_RETRIES); + callbackToCancel.cancelGroup(context); } private void scheduleInsert(ClientContext context) { @@ -530,6 +831,7 @@ public BaseClientPutter getParent() { public void cancel(ClientContext context) { USKFetcherTag tag; SingleBlockInserter localSbi; + DateHintTerminalCallback localDateHintCallback; synchronized (this) { if (finished) return; finished = true; @@ -537,6 +839,9 @@ public void cancel(ClientContext context) { fetcher = null; localSbi = sbi; sbi = null; + localDateHintCallback = + (activeDateHintPhase == null) ? null : activeDateHintPhase.terminalCallback; + activeDateHintPhase = null; } if (tag != null) { tag.cancel(context); @@ -544,6 +849,9 @@ public void cancel(ClientContext context) { if (localSbi != null) { localSbi.cancel(context); // will call onFailure, which will removeFrom() } + if (localDateHintCallback != null) { + localDateHintCallback.cancelGroup(context); + } if (freeData) { Bucket dataToFree; synchronized (this) { diff --git a/src/test/java/network/crypta/client/async/USKInserterTest.java b/src/test/java/network/crypta/client/async/USKInserterTest.java index b82507f63d..8a3856008f 100644 --- a/src/test/java/network/crypta/client/async/USKInserterTest.java +++ b/src/test/java/network/crypta/client/async/USKInserterTest.java @@ -5,6 +5,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serial; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.security.SecureRandom; import network.crypta.client.FetchContext; @@ -21,6 +24,7 @@ import network.crypta.keys.USK; import network.crypta.node.ClientContextResources; import network.crypta.support.PriorityAwareExecutor; +import network.crypta.support.Ticker; import network.crypta.support.api.Bucket; import org.jspecify.annotations.NonNull; import org.junit.jupiter.api.BeforeEach; @@ -33,10 +37,13 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -133,38 +140,7 @@ public void close() { @BeforeEach void setup() { - // Minimal ClientContext: only fields touched by these tests are used. - // Many collaborators are irrelevant here and can be passed as null or simple stubs. - context = - new ClientContext( - 1L, - new ClientContextRuntime( - Mockito.mock(ClientLayerPersister.class), - new InlineExecutor(), - null, - null, - new DummyRandomSource(), - new SecureRandom(), - null), - new ClientContextStorageFactories(null, null, null, null, null, null, null), - new ClientContextRafFactories(null, null), - new ClientContextServices( - new ClientContextResources(null, null), uskManager, null, null, null, null), - new ClientContextDefaults( - // Default persistent contexts used only when building fetchers internally; not - // exercised in these tests. - new FetchContext( - FetchContextOptions.builder() - .limits(0, 0, 0) - .archiveLimits(1, 0, 0, true) - .retryLimits(0, 0, 0) - .splitfileLimits(false, 0, 0) - .behavior(false, false, false) - .clientOptions(new SimpleEventProducer(), false, false) - .filterOverrides(null, null, null) - .build()), - newInsertContext(), - null)); + context = newContext(null); } // Helpers in tests create per-test SSKs to build consistent public/insertable USK URIs. @@ -253,6 +229,40 @@ public String toString() { }; } + private ClientContext newContext(Ticker ticker) { + // Minimal ClientContext: only fields touched by these tests are used. + // Many collaborators are irrelevant here and can be passed as null or simple stubs. + return new ClientContext( + 1L, + new ClientContextRuntime( + Mockito.mock(ClientLayerPersister.class), + new InlineExecutor(), + null, + ticker, + new DummyRandomSource(), + new SecureRandom(), + null), + new ClientContextStorageFactories(null, null, null, null, null, null, null), + new ClientContextRafFactories(null, null), + new ClientContextServices( + new ClientContextResources(null, null), uskManager, null, null, null, null), + new ClientContextDefaults( + // Default persistent contexts used only when building fetchers internally; not + // exercised in these tests. + new FetchContext( + FetchContextOptions.builder() + .limits(0, 0, 0) + .archiveLimits(1, 0, 0, true) + .retryLimits(0, 0, 0) + .splitfileLimits(false, 0, 0) + .behavior(false, false, false) + .clientOptions(new SimpleEventProducer(), false, false) + .filterOverrides(null, null, null) + .build()), + newInsertContext(), + null)); + } + private USKInserter newInserter( Bucket data, short compressionCodec, @@ -279,6 +289,77 @@ private USKInserter newInserter( new BlockInsertOptions(cfg.persistent, cfg.realTimeFlag, cfg.freeData, 0)); } + private static Object newDateHintTerminalCallback( + USKInserter inserter, long phaseId, long edition) throws Exception { + Class callbackClass = + Class.forName(USKInserter.class.getName() + "$DateHintTerminalCallback"); + Constructor ctor = + callbackClass.getDeclaredConstructor(USKInserter.class, long.class, long.class, int.class); + ctor.setAccessible(true); + return ctor.newInstance(inserter, phaseId, edition, 0); + } + + private static Object newDateHintPhase(long phaseId, Object callback) throws Exception { + Class callbackClass = + Class.forName(USKInserter.class.getName() + "$DateHintTerminalCallback"); + Class phaseClass = Class.forName(USKInserter.class.getName() + "$DateHintPhase"); + Constructor ctor = phaseClass.getDeclaredConstructor(long.class, int.class, callbackClass); + ctor.setAccessible(true); + return ctor.newInstance(phaseId, 0, callback); + } + + private static void setDateHintCallbackGroup(Object callback, MultiPutCompletionCallback group) + throws Exception { + Field f = callback.getClass().getDeclaredField("group"); + f.setAccessible(true); + f.set(callback, group); + } + + private static void setDateHintLastProgressAtMillis(Object phase, long value) throws Exception { + Field f = phase.getClass().getDeclaredField("lastProgressAtMillis"); + f.setAccessible(true); + f.setLong(phase, value); + } + + private static boolean isDateHintWatchdogCancelIssued(Object phase) throws Exception { + Field f = phase.getClass().getDeclaredField("watchdogCancelIssued"); + f.setAccessible(true); + return f.getBoolean(phase); + } + + private static void setDateHintWatchdogCancelIssued(Object phase) throws Exception { + Field f = phase.getClass().getDeclaredField("watchdogCancelIssued"); + f.setAccessible(true); + f.setBoolean(phase, true); + } + + private static void setActiveDateHintPhase(USKInserter inserter, Object phase) throws Exception { + Field activeField = USKInserter.class.getDeclaredField("activeDateHintPhase"); + activeField.setAccessible(true); + activeField.set(inserter, phase); + } + + private static Object getActiveDateHintPhase(USKInserter inserter) throws Exception { + Field activeField = USKInserter.class.getDeclaredField("activeDateHintPhase"); + activeField.setAccessible(true); + return activeField.get(inserter); + } + + private static void setDateHintTerminalAwaitingPhaseRestore( + PutCompletionCallback callback, boolean inProgress) throws Exception { + Field restoreField = callback.getClass().getDeclaredField("awaitingPhaseRestore"); + restoreField.setAccessible(true); + restoreField.setBoolean(callback, inProgress); + } + + private static void runDateHintWatchdog(USKInserter inserter, ClientContext context, long phaseId) + throws Exception { + Method watchdog = + USKInserter.class.getDeclaredMethod("runDateHintWatchdog", ClientContext.class, long.class); + watchdog.setAccessible(true); + watchdog.invoke(inserter, context, phaseId); + } + @Test void schedule_whenCalled_schedulesFetcher() throws Exception { // Arrange @@ -516,6 +597,330 @@ void cancel_whenActive_cancelsFetcher_freesData_andEmitsCancelledFailure() throw assertEquals(InsertExceptionMode.CANCELLED, ex.getValue().getMode()); } + @Test + void runDateHintWatchdog_whenPhaseStalled_cancelsActiveDateHintGroup() throws Exception { + // Arrange + byte[] bytes = "stalled-datehint".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "watchdog-stall"; + long edition = 5L; + long phaseId = 91L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false)); + + MultiPutCompletionCallback group = Mockito.mock(MultiPutCompletionCallback.class); + Object callback = newDateHintTerminalCallback(inserter, phaseId, edition); + setDateHintCallbackGroup(callback, group); + Object phase = newDateHintPhase(phaseId, callback); + setDateHintLastProgressAtMillis(phase, 0L); + setActiveDateHintPhase(inserter, phase); + + // Act + runDateHintWatchdog(inserter, context, phaseId); + + // Assert + verify(group, times(1)).cancel(context); + assertTrue(isDateHintWatchdogCancelIssued(phase)); + } + + @Test + void runDateHintWatchdog_whenRecentProgress_reschedulesWithoutCancelling() throws Exception { + // Arrange + Ticker ticker = Mockito.mock(Ticker.class); + ClientContext contextWithTicker = newContext(ticker); + byte[] bytes = "recent-datehint".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "watchdog-reschedule"; + long edition = 9L; + long phaseId = 92L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false)); + + MultiPutCompletionCallback group = Mockito.mock(MultiPutCompletionCallback.class); + Object callback = newDateHintTerminalCallback(inserter, phaseId, edition); + setDateHintCallbackGroup(callback, group); + Object phase = newDateHintPhase(phaseId, callback); + setDateHintLastProgressAtMillis(phase, System.currentTimeMillis()); + setActiveDateHintPhase(inserter, phase); + + // Act + runDateHintWatchdog(inserter, contextWithTicker, phaseId); + + // Assert + verify(group, never()).cancel(any(ClientContext.class)); + verify(ticker, times(1)) + .queueTimedJob(any(Runnable.class), Mockito.longThat(delay -> delay > 0L)); + assertFalse(isDateHintWatchdogCancelIssued(phase)); + } + + @Test + void dateHintTerminalCallback_onResume_restoresWatchdogCancelMarker() throws Exception { + // Arrange + byte[] bytes = "watchdog-restore-marker".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "watchdog-restore"; + long edition = 11L; + long phaseId = 93L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, true, false)); + + MultiPutCompletionCallback group = Mockito.mock(MultiPutCompletionCallback.class); + PutCompletionCallback terminalCallback = + (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition); + setDateHintCallbackGroup(terminalCallback, group); + Object phase = newDateHintPhase(phaseId, terminalCallback); + setDateHintLastProgressAtMillis(phase, 0L); + setActiveDateHintPhase(inserter, phase); + + // Trigger watchdog cancellation marker, then simulate restart by dropping transient phase + // state. + runDateHintWatchdog(inserter, context, phaseId); + setActiveDateHintPhase(inserter, null); + + // Act + terminalCallback.onResume(context); + + // Assert + Object restoredPhase = getActiveDateHintPhase(inserter); + assertTrue(isDateHintWatchdogCancelIssued(restoredPhase)); + } + + @Test + void dateHintTerminalCallback_onSuccess_forwardsTransitionedStateToParentCallback() + throws Exception { + // Arrange + byte[] bytes = "datehint-success".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "datehint-success-site"; + long edition = 4L; + long phaseId = 301L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false)); + + PutCompletionCallback terminalCallback = + (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition); + Object phase = newDateHintPhase(phaseId, terminalCallback); + setActiveDateHintPhase(inserter, phase); + + ClientPutState transitionedState = Mockito.mock(ClientPutState.class); + + // Act + terminalCallback.onSuccess(transitionedState, context); + + // Assert + ArgumentCaptor stateCaptor = ArgumentCaptor.forClass(ClientPutState.class); + verify(cb, times(1)).onSuccess(stateCaptor.capture(), any(ClientContext.class)); + assertSame(transitionedState, stateCaptor.getValue()); + } + + @Test + void dateHintTerminalCallback_onFailure_forwardsTransitionedStateToParentCallback() + throws Exception { + // Arrange + byte[] bytes = "datehint-failure".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "datehint-failure-site"; + long edition = 6L; + long phaseId = 302L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false)); + + PutCompletionCallback terminalCallback = + (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition); + Object phase = newDateHintPhase(phaseId, terminalCallback); + setActiveDateHintPhase(inserter, phase); + + ClientPutState transitionedState = Mockito.mock(ClientPutState.class); + InsertException failure = new InsertException(InsertExceptionMode.INTERNAL_ERROR); + + // Act + terminalCallback.onFailure(failure, transitionedState, context); + + // Assert + ArgumentCaptor stateCaptor = ArgumentCaptor.forClass(ClientPutState.class); + verify(cb, times(1)).onFailure(any(InsertException.class), stateCaptor.capture(), any()); + assertSame(transitionedState, stateCaptor.getValue()); + } + + @Test + void dateHintTerminalCallback_onSuccess_whenPhaseMissingDuringResumeRestore_forwardsSuccess() + throws Exception { + // Arrange + byte[] bytes = "datehint-resume-race".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "datehint-resume-race-site"; + long edition = 7L; + long phaseId = 401L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, true, false)); + + PutCompletionCallback terminalCallback = + (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition); + setDateHintTerminalAwaitingPhaseRestore(terminalCallback, true); + + ClientPutState transitionedState = Mockito.mock(ClientPutState.class); + + // Act + terminalCallback.onSuccess(transitionedState, context); + + // Assert + verify(cb, times(1)).onSuccess(Mockito.eq(transitionedState), any(ClientContext.class)); + } + + @Test + void dateHintTerminalCallback_onSuccess_whenPhaseMissingOutsideResumeRestore_ignoresEvent() + throws Exception { + // Arrange + byte[] bytes = "datehint-missing-phase".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "datehint-missing-phase-site"; + long edition = 8L; + long phaseId = 402L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, true, false)); + + PutCompletionCallback terminalCallback = + (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition); + setDateHintTerminalAwaitingPhaseRestore(terminalCallback, false); + + ClientPutState transitionedState = Mockito.mock(ClientPutState.class); + + // Act + terminalCallback.onSuccess(transitionedState, context); + + // Assert + verify(cb, never()).onSuccess(any(ClientPutState.class), any(ClientContext.class)); + } + + @Test + void dateHintTerminalCallback_onFailure_whenParentCancelled_doesNotRetryDatehintPhase() + throws Exception { + // Arrange + byte[] bytes = "datehint-cancel-guard".getBytes(StandardCharsets.UTF_8); + Bucket data = makeBucket(bytes); + String site = "datehint-cancel-guard-site"; + long edition = 9L; + long phaseId = 403L; + InsertableClientSSK ssk = InsertableClientSSK.createRandom(new DummyRandomSource(), site); + FreenetURI insertUri = + new FreenetURI( + "USK", + site, + null, + ssk.getInsertURI().getRoutingKey(), + ssk.getInsertURI().getCryptoKey(), + ssk.getInsertURI().getExtra(), + edition); + InsertContext ic = newInsertContext(); + USKInserter inserter = + newInserter( + data, (short) 0, insertUri, ic, new InserterCfg(false, false, false, false, false)); + when(parent.isCancelled()).thenReturn(true); + + PutCompletionCallback terminalCallback = + (PutCompletionCallback) newDateHintTerminalCallback(inserter, phaseId, edition); + Object phase = newDateHintPhase(phaseId, terminalCallback); + setDateHintWatchdogCancelIssued(phase); + setActiveDateHintPhase(inserter, phase); + + ClientPutState transitionedState = Mockito.mock(ClientPutState.class); + InsertException cancelled = new InsertException(InsertExceptionMode.CANCELLED); + + // Act + terminalCallback.onFailure(cancelled, transitionedState, context); + + // Assert + verify(cb, times(1)) + .onFailure(Mockito.eq(cancelled), Mockito.eq(transitionedState), any(ClientContext.class)); + verify(cb, never()).onTransition(any(ClientPutState.class), any(ClientPutState.class), any()); + } + @Test void onFailure_afterCancelAndDataReleased_doesNotThrow() throws Exception { byte[] bytes = "late-cancel".getBytes(StandardCharsets.UTF_8); @@ -693,6 +1098,4 @@ void onFailure_whenNonCollision_callsCallbackAndFreesData() throws Exception { verify(cb, times(1)).onFailure(Mockito.eq(ex2), any(), any()); verify(data2, times(1)).free(); } - - // Reflection helpers are intentionally omitted; tests use public APIs to prime state. }