From de5c0d51b976a2a3c60235da56cfd854418007a7 Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Mon, 9 Apr 2018 11:07:49 +0800 Subject: [PATCH] ARTEMIS-1791 Large message files are not removed after redistribution across a cluster --- .../artemis/core/postoffice/Bindings.java | 2 ++ .../core/postoffice/impl/BindingsImpl.java | 5 ++++ .../core/postoffice/impl/PostOfficeImpl.java | 25 +++++++++++++++---- .../server/cluster/impl/Redistributor.java | 1 + .../impl/WildcardAddressManagerUnitTest.java | 5 ++++ 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index f3592c4a79..30a268056d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener { boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception; void route(Message message, RoutingContext context) throws Exception; + + boolean allowRedistribute(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 2e2b31cfdf..478c700b4b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -154,6 +154,11 @@ public final class BindingsImpl implements Bindings { } } + @Override + public boolean allowRedistribute() { + return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND); + } + @Override public boolean redistribute(final Message message, final Queue originatingQueue, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b2bfe37e07..f1f7a3808d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public Pair redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { - // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message - // arrived the target node - // as described on https://issues.jboss.org/browse/JBPAPP-6130 - Message copyRedistribute = message.copy(storageManager.generateID()); Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress()); - if (bindings != null) { + if (bindings != null && bindings.allowRedistribute()) { + // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message + // arrived the target node + // as described on https://issues.jboss.org/browse/JBPAPP-6130 + Message copyRedistribute = message.copy(storageManager.generateID()); + if (tx != null) { + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterRollback(Transaction tx) { + try { + //this will cause large message file to be + //cleaned up + copyRedistribute.decrementRefCount(); + } catch (Exception e) { + logger.warn("Failed to clean up message: " + copyRedistribute); + } + } + }); + } + RoutingContext context = new RoutingContextImpl(tx); boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index cfb9eeefae..79820186bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -150,6 +150,7 @@ public class Redistributor implements Consumer { final Pair routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx); if (routingInfo == null) { + tx.rollback(); return HandleStatus.BUSY; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 1c13cbde5b..40fadf9d9f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { public void route(Message message, RoutingContext context) throws Exception { System.out.println("routing message: " + message); } + + @Override + public boolean allowRedistribute() { + return false; + } } }