This closes #4506
This commit is contained in:
commit
5722206e92
|
@ -269,7 +269,9 @@ public final class BindingsImpl implements Bindings {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
|
logger.debug("Message {} being copied as {}", message.getMessageID(), copyRedistribute.getMessageID());
|
||||||
}
|
}
|
||||||
|
|
||||||
copyRedistribute.setAddress(message.getAddress());
|
copyRedistribute.setAddress(message.getAddress());
|
||||||
|
copyRedistribute.clearInternalProperties();
|
||||||
|
|
||||||
if (context.getTransaction() == null) {
|
if (context.getTransaction() == null) {
|
||||||
context.setTransaction(new TransactionImpl(storageManager));
|
context.setTransaction(new TransactionImpl(storageManager));
|
||||||
|
|
|
@ -630,6 +630,17 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||||
boolean autoCommitAcks,
|
boolean autoCommitAcks,
|
||||||
final String user,
|
final String user,
|
||||||
final String password) throws Exception {
|
final String password) throws Exception {
|
||||||
|
addConsumer(consumerID, node, queueName, filterVal, autoCommitAcks, user, password, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addConsumer(final int consumerID,
|
||||||
|
final int node,
|
||||||
|
final String queueName,
|
||||||
|
final String filterVal,
|
||||||
|
boolean autoCommitAcks,
|
||||||
|
final String user,
|
||||||
|
final String password,
|
||||||
|
final int ackBatchSize) throws Exception {
|
||||||
try {
|
try {
|
||||||
if (consumers[consumerID] != null) {
|
if (consumers[consumerID] != null) {
|
||||||
throw new IllegalArgumentException("Already a consumer at " + node);
|
throw new IllegalArgumentException("Already a consumer at " + node);
|
||||||
|
@ -641,7 +652,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
||||||
throw new IllegalArgumentException("No sf at " + node);
|
throw new IllegalArgumentException("No sf at " + node);
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE));
|
ClientSession session = addClientSession(sf.createSession(user, password, false, false, autoCommitAcks, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ackBatchSize));
|
||||||
|
|
||||||
String filterString = null;
|
String filterString = null;
|
||||||
|
|
||||||
|
|
|
@ -173,6 +173,85 @@ public class MessageRedistributionTest extends ClusterTestBase {
|
||||||
logger.debug("Test done");
|
logger.debug("Test done");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRedistributionWithMultipleQueuesOnTheSameAddress() throws Exception {
|
||||||
|
final int MESSAGE_COUNT = 10;
|
||||||
|
final String ADDRESS = "myAddress";
|
||||||
|
final String QUEUE0 = "queue0";
|
||||||
|
final String QUEUE1 = "queue1";
|
||||||
|
|
||||||
|
getServer(0).getConfiguration().addAddressSetting(ADDRESS, new AddressSettings().setRedistributionDelay(0));
|
||||||
|
getServer(1).getConfiguration().addAddressSetting(ADDRESS, new AddressSettings().setRedistributionDelay(0));
|
||||||
|
|
||||||
|
setupClusterConnection("cluster0", ADDRESS, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
||||||
|
setupClusterConnection("cluster1", ADDRESS, MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
||||||
|
|
||||||
|
startServers(0, 1);
|
||||||
|
|
||||||
|
waitForTopology(servers[0], 2);
|
||||||
|
waitForTopology(servers[1], 2);
|
||||||
|
|
||||||
|
setupSessionFactory(0, isNetty());
|
||||||
|
setupSessionFactory(1, isNetty());
|
||||||
|
|
||||||
|
createQueue(0, ADDRESS, QUEUE0, null, false);
|
||||||
|
createQueue(0, ADDRESS, QUEUE1, null, false);
|
||||||
|
|
||||||
|
createQueue(1, ADDRESS, QUEUE0, null, false);
|
||||||
|
createQueue(1, ADDRESS, QUEUE1, null, false);
|
||||||
|
|
||||||
|
addConsumer(0, 0, QUEUE0, null, true, null, null, 0);
|
||||||
|
addConsumer(1, 1, QUEUE0, null, true, null, null, 0);
|
||||||
|
|
||||||
|
waitForBindings(0, ADDRESS, 2, 1, true);
|
||||||
|
waitForBindings(0, ADDRESS, 2, 1, false);
|
||||||
|
|
||||||
|
waitForBindings(1, ADDRESS, 2, 1, true);
|
||||||
|
waitForBindings(1, ADDRESS, 2, 1, false);
|
||||||
|
|
||||||
|
send(0, ADDRESS, MESSAGE_COUNT, true, null, RoutingType.MULTICAST, null);
|
||||||
|
|
||||||
|
{ // make sure all the messages were delivered to the proper queues & nodes
|
||||||
|
Wait.assertEquals(5L, () -> servers[0].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100);
|
||||||
|
Wait.assertEquals(5L, () -> servers[1].locateQueue(QUEUE0).getMessagesAdded(), 2000, 100);
|
||||||
|
|
||||||
|
Wait.assertEquals(10L, () -> servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
|
||||||
|
Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
|
||||||
|
{
|
||||||
|
ClientMessage m = consumers[0].getConsumer().receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
ClientMessage m = consumers[1].getConsumer().receive(1000);
|
||||||
|
assertNotNull(m);
|
||||||
|
m.acknowledge();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{ // make sure all the messages were consumed propertly
|
||||||
|
Wait.assertEquals(5L, () -> servers[0].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100);
|
||||||
|
Wait.assertEquals(0L, () -> servers[0].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
|
||||||
|
|
||||||
|
Wait.assertEquals(5L, () -> servers[1].locateQueue(QUEUE0).getMessagesAcknowledged(), 2000, 100);
|
||||||
|
Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
// add consumer to force redistribution of messages to node 1
|
||||||
|
addConsumer(2, 1, QUEUE1, null);
|
||||||
|
waitForBindings(1, ADDRESS, 2, 2, true);
|
||||||
|
waitForBindings(0, ADDRESS, 2, 2, false);
|
||||||
|
|
||||||
|
Wait.assertEquals(10L, () -> servers[1].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
|
||||||
|
Wait.assertEquals(0L, () -> servers[0].locateQueue(QUEUE1).getMessageCount(), 2000, 100);
|
||||||
|
|
||||||
|
// ensure no messages were inadvertently redistributed to the wrong queue (i.e. the main point of this test)
|
||||||
|
Wait.assertEquals(0L, () -> servers[1].locateQueue(QUEUE0).getMessageCount(), 2000, 100);
|
||||||
|
}
|
||||||
|
|
||||||
//https://issues.jboss.org/browse/HORNETQ-1057
|
//https://issues.jboss.org/browse/HORNETQ-1057
|
||||||
@Test
|
@Test
|
||||||
public void testRedistributionStopsWhenConsumerAdded() throws Exception {
|
public void testRedistributionStopsWhenConsumerAdded() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue