ARTEMIS-1791 Large message files are not removed after redistribution across a cluster

This commit is contained in:
Howard Gao 2018-04-09 11:07:49 +08:00 committed by Clebert Suconic
parent c69d6b0476
commit de5c0d51b9
5 changed files with 33 additions and 5 deletions

View File

@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener {
boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
void route(Message message, RoutingContext context) throws Exception;
boolean allowRedistribute();
}

View File

@ -154,6 +154,11 @@ public final class BindingsImpl implements Bindings {
}
}
@Override
public boolean allowRedistribute() {
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
}
@Override
public boolean redistribute(final Message message,
final Queue originatingQueue,

View File

@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
if (bindings != null) {
if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
if (tx != null) {
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterRollback(Transaction tx) {
try {
//this will cause large message file to be
//cleaned up
copyRedistribute.decrementRefCount();
} catch (Exception e) {
logger.warn("Failed to clean up message: " + copyRedistribute);
}
}
});
}
RoutingContext context = new RoutingContextImpl(tx);
boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);

View File

@ -150,6 +150,7 @@ public class Redistributor implements Consumer {
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
if (routingInfo == null) {
tx.rollback();
return HandleStatus.BUSY;
}

View File

@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
public void route(Message message, RoutingContext context) throws Exception {
System.out.println("routing message: " + message);
}
@Override
public boolean allowRedistribute() {
return false;
}
}
}