From 0f4a8c3c2d869527b4575e5b45b527d7640f9852 Mon Sep 17 00:00:00 2001 From: Erich Duda Date: Tue, 22 Aug 2017 21:48:19 +0200 Subject: [PATCH] ARTEMIS-1368 Artemis gets to state when it doesn't respond to producer There is a leak on replication tokens in the moment when a backup is shutdowned or killed and the ReplicationManager is stopped. If there are some tasks (holding replication tokens) in the executor, these tokens are simply ignored and replicationDone method isn't called on them. Because of this, some tasks in OperationContextImpl cannot be finished. (cherry picked from commit 88a018e17fd49097de1186c65e25cd0af578b6a9) (cherry picked from commit d6cbc0aa885fa88beb7f1b2450cdfe4da9466947) --- .../core/replication/ReplicationManager.java | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index d298a24880..4241996fa5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -347,34 +347,26 @@ public final class ReplicationManager implements ActiveMQComponent { } private OperationContext sendReplicatePacket(final Packet packet, boolean lineUp) { - if (!enabled) + if (!enabled) { + packet.release(); return null; - boolean runItNow = false; + } final OperationContext repliToken = OperationContextImpl.getContext(executorFactory); if (lineUp) { repliToken.replicationLineUp(); } - if (enabled) { - replicationStream.execute(() -> { - if (enabled) { - pendingTokens.add(repliToken); - flowControl(packet.expectedEncodeSize()); - replicatingChannel.send(packet); - } - }); - } else { - // Already replicating channel failed, so just play the action now - runItNow = true; - packet.release(); - } - - // Execute outside lock - - if (runItNow) { - repliToken.replicationDone(); - } + replicationStream.execute(() -> { + if (enabled) { + pendingTokens.add(repliToken); + flowControl(packet.expectedEncodeSize()); + replicatingChannel.send(packet); + } else { + packet.release(); + repliToken.replicationDone(); + } + }); return repliToken; }