This commit is contained in:
Clebert Suconic 2018-04-09 11:11:45 -04:00
commit ce4670f294
6 changed files with 78 additions and 5 deletions

View File

@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener {
boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception;
void route(Message message, RoutingContext context) throws Exception;
boolean allowRedistribute();
}

View File

@ -154,6 +154,11 @@ public final class BindingsImpl implements Bindings {
}
}
@Override
public boolean allowRedistribute() {
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);
}
@Override
public boolean redistribute(final Message message,
final Queue originatingQueue,

View File

@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
public Pair<RoutingContext, Message> redistribute(final Message message,
final Queue originatingQueue,
final Transaction tx) throws Exception {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
if (bindings != null) {
if (bindings != null && bindings.allowRedistribute()) {
// We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
// arrived the target node
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
if (tx != null) {
tx.addOperation(new TransactionOperationAbstract() {
@Override
public void afterRollback(Transaction tx) {
try {
//this will cause large message file to be
//cleaned up
copyRedistribute.decrementRefCount();
} catch (Exception e) {
logger.warn("Failed to clean up message: " + copyRedistribute);
}
}
});
}
RoutingContext context = new RoutingContextImpl(tx);
boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);

View File

@ -150,6 +150,7 @@ public class Redistributor implements Consumer {
final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);
if (routingInfo == null) {
tx.rollback();
return HandleStatus.BUSY;
}

View File

@ -16,11 +16,56 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import java.io.File;
import java.util.Arrays;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.junit.Test;
public class LargeMessageRedistributionTest extends MessageRedistributionTest {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@Override
public boolean isLargeMessage() {
return true;
}
@Test
public void testRedistributionLargeMessageDirCleanup() throws Exception {
final long delay = 1000;
final int numMessages = 5;
setRedistributionDelay(delay);
setupCluster(MessageLoadBalancingType.ON_DEMAND);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 0, true);
waitForBindings(0, "queues.testaddress", 1, 0, false);
waitForBindings(1, "queues.testaddress", 1, 0, false);
send(0, "queues.testaddress", numMessages, false, null);
addConsumer(0, 0, "queue0", null);
verifyReceiveAll(numMessages, 0);
removeConsumer(0);
addConsumer(1, 1, "queue0", null);
verifyReceiveAll(numMessages, 1);
removeConsumer(1);
Wait.assertEquals(0, () -> getServer(0).getConfiguration().getLargeMessagesLocation().listFiles().length);
Wait.assertEquals(numMessages, () -> getServer(1).getConfiguration().getLargeMessagesLocation().listFiles().length);
}
}

View File

@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
public void route(Message message, RoutingContext context) throws Exception {
System.out.println("routing message: " + message);
}
@Override
public boolean allowRedistribute() {
return false;
}
}
}