Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
314 changes: 311 additions & 3 deletions src/main/java/network/crypta/client/async/USKInserter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import network.crypta.client.InsertContext;
import network.crypta.client.InsertException.InsertExceptionMode;
import network.crypta.client.InsertException;
Expand Down Expand Up @@ -125,6 +126,12 @@ public final class USKInserter
/** After this many attempted slots without success, fall back to re-fetch the latest. */
private static final long MAX_TRIED_SLOTS = 10;

/** Maximum time to wait for USK datehint child inserts to make progress before retrying. */
private static final long DATEHINT_STALL_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(15);

/** Retry stalled USK datehint phase at most this many times before surfacing failure. */
private static final int MAX_DATEHINT_STALL_RETRIES = 1;

/** If true, frees {@link #data} after completion or terminal failure. */
private final boolean freeData;

Expand All @@ -140,6 +147,222 @@ public final class USKInserter
/** Optional override for the crypto key material; {@code null} to auto-generate. */
final byte[] forceCryptoKey;

/** Active datehint insert phase; null when not currently inserting datehints. */
private transient DateHintPhase activeDateHintPhase;

/** Monotonic identifier for datehint phases; helps ignore stale callbacks. */
private transient long nextDateHintPhaseId = 1;

/**
* Runtime-only state for one active datehint insert phase.
*
* <p>This state is intentionally transient and rebuilt by the terminal callback's {@link
* PutCompletionCallback#onResume(ClientContext)} path after restart.
*/
private static final class DateHintPhase {
final long phaseId;
final int retryCount;
final DateHintTerminalCallback terminalCallback;
long lastProgressAtMillis;
boolean watchdogCancelIssued;

DateHintPhase(long phaseId, int retryCount, DateHintTerminalCallback terminalCallback) {
this.phaseId = phaseId;
this.retryCount = retryCount;
this.terminalCallback = terminalCallback;
this.lastProgressAtMillis = System.currentTimeMillis();
this.watchdogCancelIssued = false;
}
}

/**
* Terminal callback for a datehint phase.
*
* <p>Delegates normal notifications to {@link #cb}, while intercepting terminal success/failure
* so we can retry a stalled phase once when the watchdog cancels it for lack of progress.
*/
private final class DateHintTerminalCallback implements PutCompletionCallback, Serializable {
@Serial private static final long serialVersionUID = 1L;

private final long phaseId;
private final long edition;
private final int retryCount;
private MultiPutCompletionCallback group;
private volatile boolean watchdogCancelIssued;
private transient volatile boolean awaitingPhaseRestore;
private transient volatile boolean completedBeforePhaseRestore;

DateHintTerminalCallback(long phaseId, long edition, int retryCount) {
this.phaseId = phaseId;
this.edition = edition;
this.retryCount = retryCount;
this.watchdogCancelIssued = false;
this.awaitingPhaseRestore = false;
}

void bindGroup(MultiPutCompletionCallback group) {
this.group = group;
}

void cancelGroup(ClientContext context) {
MultiPutCompletionCallback localGroup = group;
if (localGroup != null) localGroup.cancel(context);
}

private boolean isParentRequestCancelled() {
BaseClientPutter parentPutter = parent;
return parentPutter == null || parentPutter.isCancelled();
}

private PutCompletionCallback getCompletionCallbackOrNull(String eventName) {
PutCompletionCallback callback = cb;
if (callback == null && LOG.isDebugEnabled()) {
LOG.debug(
"Dropping datehint {} callback for {} phase {} because completion callback is null",
eventName,
USKInserter.this,
phaseId);
}
return callback;
}

private void markDateHintProgress(long phaseId) {
synchronized (USKInserter.this) {
if (activeDateHintPhase == null || activeDateHintPhase.phaseId != phaseId) return;
activeDateHintPhase.lastProgressAtMillis = System.currentTimeMillis();
}
}

private void onDateHintPhaseResumed(ClientContext context) {
synchronized (USKInserter.this) {
DateHintPhase current = activeDateHintPhase;
if (current == null || current.phaseId != phaseId) {
DateHintPhase restored = new DateHintPhase(phaseId, retryCount, this);
restored.watchdogCancelIssued = watchdogCancelIssued;
activeDateHintPhase = restored;
nextDateHintPhaseId = Math.max(nextDateHintPhaseId, phaseId + 1);
} else {
current.watchdogCancelIssued |= watchdogCancelIssued;
current.lastProgressAtMillis = System.currentTimeMillis();
}
}
scheduleDateHintWatchdog(context, phaseId, DATEHINT_STALL_TIMEOUT_MILLIS);
}

private void onDateHintPhaseFinished(
ClientPutState completedState, InsertException failure, ClientContext context) {
boolean retryOnStallCancel;
synchronized (USKInserter.this) {
DateHintPhase phase = activeDateHintPhase;
if (phase == null) {
// Resume-order race: MultiPut resumes children before callback.onResume() rebuilds phase.
if (!awaitingPhaseRestore || completedBeforePhaseRestore) return;
retryOnStallCancel = false;
completedBeforePhaseRestore = true;
Comment on lines +260 to +261

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Retry stalled datehint failures during resume race

When a persistent request restarts and a terminal callback arrives before onResume() rebuilds activeDateHintPhase (awaitingPhaseRestore path), this block forces retryOnStallCancel to false. In that timing window, a watchdog-triggered CANCELLED failure is forwarded immediately even if retry budget remains, so stalled datehint phases that should get one retry fail permanently after restart.

Useful? React with 👍 / 👎.

} else {
if (phase.phaseId != phaseId) return;
boolean watchdogCancelled = phase.watchdogCancelIssued || watchdogCancelIssued;
retryOnStallCancel =
failure != null
&& watchdogCancelled
&& failure.getMode() == InsertExceptionMode.CANCELLED
&& retryCount < MAX_DATEHINT_STALL_RETRIES
&& !isParentRequestCancelled();
activeDateHintPhase = null;
}
}
if (retryOnStallCancel) {
LOG.warn(
"Retrying USK datehint insert phase for {} edition {} after watchdog cancel (attempt {}"
+ " of {})",
USKInserter.this,
edition,
retryCount + 1,
MAX_DATEHINT_STALL_RETRIES);
startDateHintInsertPhase(context, edition, retryCount + 1, completedState);
return;
}
PutCompletionCallback callback =
getCompletionCallbackOrNull(failure != null ? "onFailure" : "onSuccess");
if (callback == null) return;
if (failure != null) callback.onFailure(failure, completedState, context);
else callback.onSuccess(completedState, context);
}

@Override
public void onSuccess(ClientPutState state, ClientContext context) {
markDateHintProgress(phaseId);
onDateHintPhaseFinished(state, null, context);
}

@Override
public void onFailure(InsertException e, ClientPutState state, ClientContext context) {
markDateHintProgress(phaseId);
onDateHintPhaseFinished(state, e, context);
}

@Override
public void onEncode(BaseClientKey usk, ClientPutState state, ClientContext context) {
markDateHintProgress(phaseId);
PutCompletionCallback callback = getCompletionCallbackOrNull("onEncode");
if (callback != null) callback.onEncode(usk, state, context);
}

@Override
public void onTransition(
ClientPutState oldState, ClientPutState newState, ClientContext context) {
markDateHintProgress(phaseId);
PutCompletionCallback callback = getCompletionCallbackOrNull("onTransition");
if (callback != null) callback.onTransition(oldState, newState, context);
}

@Override
public void onMetadata(Metadata m, ClientPutState state, ClientContext context) {
markDateHintProgress(phaseId);
PutCompletionCallback callback = getCompletionCallbackOrNull("onMetadataMetadata");
if (callback != null) callback.onMetadata(m, state, context);
}

@Override
public void onMetadata(Bucket meta, ClientPutState state, ClientContext context) {
markDateHintProgress(phaseId);
PutCompletionCallback callback = getCompletionCallbackOrNull("onMetadataBucket");
if (callback != null) callback.onMetadata(meta, state, context);
}

@Override
public void onFetchable(ClientPutState state) {
markDateHintProgress(phaseId);
PutCompletionCallback callback = getCompletionCallbackOrNull("onFetchable");
if (callback != null) callback.onFetchable(state);
}

@Override
public void onBlockSetFinished(ClientPutState state, ClientContext context) {
markDateHintProgress(phaseId);
PutCompletionCallback callback = getCompletionCallbackOrNull("onBlockSetFinished");
if (callback != null) callback.onBlockSetFinished(state, context);
}

@Override
public void onResume(ClientContext context) throws InsertException, ResumeFailedException {
if (!completedBeforePhaseRestore) {
onDateHintPhaseResumed(context);
}
awaitingPhaseRestore = false;
PutCompletionCallback callback = getCompletionCallbackOrNull("onResume");
if (callback != null && callback != parent) callback.onResume(context);
}

@Serial
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
awaitingPhaseRestore = true;
completedBeforePhaseRestore = false;
}
}

/**
* Starts the asynchronous USK insert process.
*
Expand Down Expand Up @@ -264,11 +487,26 @@ private void insertSucceeded(ClientContext context, long edition) {
cb.onSuccess(this, context);
return;
}
startDateHintInsertPhase(context, edition, 0, this);
}

private void startDateHintInsertPhase(
ClientContext context, long edition, int retryCount, ClientPutState transitionFrom) {
if (LOG.isDebugEnabled())
LOG.debug("Inserted to edition {} - inserting USK date hints...", edition);
LOG.debug(
"Inserted to edition {} - inserting USK date hints (retry {} of {})...",
edition,
retryCount,
MAX_DATEHINT_STALL_RETRIES);
USKDateHint hint = USKDateHint.now();
long phaseId = reserveDateHintPhaseId();
DateHintTerminalCallback terminalCallback =
new DateHintTerminalCallback(phaseId, edition, retryCount);
MultiPutCompletionCallback m =
new MultiPutCompletionCallback(cb, parent, tokenObject, persistent, true);
new MultiPutCompletionCallback(terminalCallback, parent, tokenObject, persistent, true);
terminalCallback.bindGroup(m);
activateDateHintPhase(new DateHintPhase(phaseId, retryCount, terminalCallback));

byte[] hintData = hint.getData(edition).getBytes(StandardCharsets.UTF_8);
FreenetURI[] hintURIs = hint.getInsertURIs(privUSK);
boolean added = false;
Expand All @@ -290,6 +528,7 @@ private void insertSucceeded(ClientContext context, long edition) {
} catch (IOException e) {
LOG.error("Unable to insert USK date hints due to disk I/O error: {}", e, e);
if (!added) {
clearActiveDateHintPhase(phaseId);
cb.onFailure(
new InsertException(
InsertExceptionMode.BUCKET_ERROR, e, pubUSK.getSSK(edition).getURI()),
Expand All @@ -300,13 +539,75 @@ private void insertSucceeded(ClientContext context, long edition) {
} catch (InsertException e) {
LOG.error("Unable to insert USK date hints due to error: {}", e, e);
if (!added) {
clearActiveDateHintPhase(phaseId);
cb.onFailure(e, this, context);
return;
} // Else try to insert the other hints.
}
}
cb.onTransition(this, m, context);
if (!added) {
clearActiveDateHintPhase(phaseId);
cb.onSuccess(this, context);
return;
}
cb.onTransition(transitionFrom, m, context);
m.arm(context);
scheduleDateHintWatchdog(context, phaseId, DATEHINT_STALL_TIMEOUT_MILLIS);
}

private synchronized long reserveDateHintPhaseId() {
return nextDateHintPhaseId++;
}

private synchronized void activateDateHintPhase(DateHintPhase phase) {
activeDateHintPhase = phase;
nextDateHintPhaseId = Math.max(nextDateHintPhaseId, phase.phaseId + 1);
}

private synchronized void clearActiveDateHintPhase(long phaseId) {
if (activeDateHintPhase != null && activeDateHintPhase.phaseId == phaseId) {
activeDateHintPhase = null;
}
}

private void scheduleDateHintWatchdog(ClientContext context, long phaseId, long delayMillis) {
if (context.ticker == null) return;
long boundedDelay = Math.max(1L, delayMillis);
context.ticker.queueTimedJob(() -> runDateHintWatchdog(context, phaseId), boundedDelay);
}

private void runDateHintWatchdog(ClientContext context, long phaseId) {
DateHintTerminalCallback callbackToCancel = null;
long rescheduleDelay = -1L;
long stalledFor;
int retryCount = 0;
synchronized (this) {
DateHintPhase phase = activeDateHintPhase;
if (phase == null || phase.phaseId != phaseId) return;
long now = System.currentTimeMillis();
stalledFor = now - phase.lastProgressAtMillis;
if (stalledFor < DATEHINT_STALL_TIMEOUT_MILLIS) {
rescheduleDelay = DATEHINT_STALL_TIMEOUT_MILLIS - stalledFor;
} else if (!phase.watchdogCancelIssued) {
phase.watchdogCancelIssued = true;
phase.terminalCallback.watchdogCancelIssued = true;
callbackToCancel = phase.terminalCallback;
retryCount = phase.retryCount;
}
}
if (rescheduleDelay > 0L) {
scheduleDateHintWatchdog(context, phaseId, rescheduleDelay);
return;
}
if (callbackToCancel == null) return;
LOG.warn(
"USK datehint insert phase {} stalled for {} ms on {} (retry {} of {}) - cancelling phase",
phaseId,
stalledFor,
this,
retryCount,
MAX_DATEHINT_STALL_RETRIES);
callbackToCancel.cancelGroup(context);
}

private void scheduleInsert(ClientContext context) {
Expand Down Expand Up @@ -530,20 +831,27 @@ public BaseClientPutter getParent() {
public void cancel(ClientContext context) {
USKFetcherTag tag;
SingleBlockInserter localSbi;
DateHintTerminalCallback localDateHintCallback;
synchronized (this) {
if (finished) return;
finished = true;
tag = fetcher;
fetcher = null;
localSbi = sbi;
sbi = null;
localDateHintCallback =
(activeDateHintPhase == null) ? null : activeDateHintPhase.terminalCallback;
activeDateHintPhase = null;
}
if (tag != null) {
tag.cancel(context);
}
if (localSbi != null) {
localSbi.cancel(context); // will call onFailure, which will removeFrom()
}
if (localDateHintCallback != null) {
localDateHintCallback.cancelGroup(context);
}
if (freeData) {
Bucket dataToFree;
synchronized (this) {
Expand Down
Loading
Loading