Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public boolean trySubmitDenominate(MasternodeAddress mnAddr) {
public boolean markAlreadyJoinedQueueAsTried(CoinJoinQueue dsq) {
lock.lock();
try {
for (CoinJoinClientSession session :deqSessions){
for (CoinJoinClientSession session : deqSessions) {
Masternode mnMixing;
if ((mnMixing = session.getMixingMasternodeInfo()) != null && mnMixing.getProTxHash().equals(dsq.getProTxHash())) {
dsq.setTried(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ public void processDSQueue(Peer from, CoinJoinQueue dsq, boolean enable_bip61) {

log.info("coinjoin: DSQUEUE: new {} from mn {}", dsq, dmn.getService().getAddr());

coinJoinManager.coinJoinClientManagers.values().stream().anyMatch(new Predicate<CoinJoinClientManager>() {
@Override
public boolean test(CoinJoinClientManager coinJoinClientManager) {
return coinJoinClientManager.markAlreadyJoinedQueueAsTried(dsq);
}
});
coinJoinManager.coinJoinClientManagers.values().stream().anyMatch(
coinJoinClientManager -> coinJoinClientManager.markAlreadyJoinedQueueAsTried(dsq)
);

if (queueLock.tryLock()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.bitcoinj.evolution.SimplifiedMasternodeListManager;
import org.bitcoinj.quorums.ChainLocksHandler;
import org.bitcoinj.quorums.QuorumRotationInfo;
import org.bitcoinj.utils.ContextPropagatingThreadFactory;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.wallet.Wallet;
import org.bitcoinj.wallet.WalletEx;
Expand All @@ -68,6 +69,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -94,6 +97,8 @@ public class CoinJoinManager {
private RequestKeyParameter requestKeyParameter;
private RequestDecryptedKey requestDecryptedKey;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService messageProcessingExecutor = Executors.newFixedThreadPool(5,
new ContextPropagatingThreadFactory("CoinJoin-MessageProcessor"));
Comment on lines +100 to +101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the CoinJoinManager.java file to see the full lifecycle methods
fd -t f CoinJoinManager.java | head -1 | xargs wc -l

Repository: dashpay/dashj

Length of output: 128


🏁 Script executed:

# Get the complete file to understand the lifecycle
fd -t f CoinJoinManager.java | head -1 | xargs cat -n

Repository: dashpay/dashj

Length of output: 25617


🏁 Script executed:

# Search for start(), stop(), close() method definitions in CoinJoinManager
rg -n 'public.*(?:void\s+)?(start|stop|close)\(' core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java -A 20

Repository: dashpay/dashj

Length of output: 2138


🏁 Script executed:

# Search for usage patterns of stop() and start() calls in the codebase
rg -n 'coinJoinManager\.(stop|start|close)\(' --type=java -B 2 -A 2

Repository: dashpay/dashj

Length of output: 3053


🏁 Script executed:

# Check if there's any recreation or reinitialization of CoinJoinManager
rg -n 'new CoinJoinManager\(' --type=java -B 2 -A 2

Repository: dashpay/dashj

Length of output: 782


Executor lifecycle constraint: reuse not supported after stop() is called.

The executor is declared final and initialized once at field declaration. The start() method does not reinitialize it. Once stop() shuts it down (lines 216-226), calling start() again—or receiving messages before close() removes listeners—will cause preMessageReceivedEventListener to submit tasks to a shutdown executor, resulting in RejectedExecutionException.

While the current codebase follows a safe pattern (start → stop → close without restart), this architectural constraint should be documented and enforced. Consider either:

  • Recreating the executor in start() and shutting it down only in close()
  • Adding a guard in preMessageReceivedEventListener to check executor state before submission
  • Documenting that start() must not be called after stop()

Additionally, line 513 contains a typo: "meessages" should be "messages".

🤖 Prompt for AI Agents
In `@core/src/main/java/org/bitcoinj/coinjoin/utils/CoinJoinManager.java` around
lines 100 - 101, The messageProcessingExecutor is created once as a final field
and shut down in stop(), so subsequent start() calls or
preMessageReceivedEventListener submissions can hit RejectedExecutionException;
fix by either (A) moving initialization of messageProcessingExecutor into
start() (recreate a new ExecutorService there) and only shut it down in close(),
or (B) add a guard in preMessageReceivedEventListener that checks
messageProcessingExecutor.isShutdown()/isTerminated() and skips submission (or
logs and drops) if executor is not accepting tasks; update stop()/close() to
match the chosen lifecycle (ensure no double-shutdown) and add a brief JavaDoc
to start()/stop()/close() describing the non-restartable or restartable
contract, and correct the typo "meessages" → "messages" referenced near the
comment at line 513.

protected final ReentrantLock lock = Threading.lock("coinjoin-manager");

private boolean finishCurrentSessions = false;
Expand Down Expand Up @@ -206,6 +211,19 @@ public void stop() {
}
stopAsync();
finishCurrentSessions = false;

// Shutdown the message processing executor
messageProcessingExecutor.shutdown();
try {
if (!messageProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
log.warn("CoinJoin message processing executor did not terminate in time, forcing shutdown");
messageProcessingExecutor.shutdownNow();
}
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for message processing executor to terminate", e);
messageProcessingExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -262,6 +280,10 @@ public void close() {
if (masternodeGroup != null) {
masternodeGroup.removePreMessageReceivedEventListener(preMessageReceivedEventListener);
}
// Ensure executor is shut down
if (!messageProcessingExecutor.isShutdown()) {
messageProcessingExecutor.shutdown();
}
}

public boolean isMasternodeOrDisconnectRequested(MasternodeAddress address) {
Expand Down Expand Up @@ -483,7 +505,15 @@ public void processTransaction(Transaction tx) {
}

public final PreMessageReceivedEventListener preMessageReceivedEventListener = (peer, m) -> {
if (isCoinJoinMessage(m)) {
if (m instanceof CoinJoinQueue) {
// Offload DSQueue message processing to thread pool to avoid blocking network I/O thread
messageProcessingExecutor.execute(() -> {
processMessage(peer, m);
});
// Return null as dsq meessages are only processed above
return null;
} else if (isCoinJoinMessage(m)) {
// Process other CoinJoin messages synchronously
return processMessage(peer, m);
}
return m;
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/org/bitcoinj/core/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -1762,6 +1762,7 @@ public void setCoinJoinTransactionType(CoinJoinTransactionType coinJoinTransacti
}

/* returns false if inputs > 4 or there are less than the required confirmations */
@Deprecated
public boolean isSimple() {
if(inputs.size() > MAX_INPUTS_FOR_AUTO_IX)
return false;
Expand Down
Loading
Loading