This commit is contained in:
Clebert Suconic 2018-01-17 11:19:07 -05:00
commit 60055d7a08
3 changed files with 50 additions and 2 deletions

View File

@ -892,7 +892,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// as described on https://issues.jboss.org/browse/JBPAPP-6130
Message copyRedistribute = message.copy(storageManager.generateID());
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
if (bindings != null) {
RoutingContext context = new RoutingContextImpl(tx);

View File

@ -1243,7 +1243,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
if (params == null)
params = new HashMap<>();
return new TransportConfiguration(className, params);
return new TransportConfiguration(className, params, UUIDGenerator.getInstance().generateStringUUID(), new HashMap<String, Object>());
}
protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException {

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@ -30,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
@ -651,6 +653,52 @@ public class MessageRedistributionTest extends ClusterTestBase {
verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids0, 2);
}
@Test
public void testRedistributionWithPrefixesWhenRemoteConsumerIsAdded() throws Exception {
for (int i = 0; i <= 2; i++) {
ActiveMQServer server = getServer(i);
for (TransportConfiguration c : server.getConfiguration().getAcceptorConfigurations()) {
c.getExtraParams().putIfAbsent("anycastPrefix", "jms.queue.");
}
}
setupCluster(MessageLoadBalancingType.ON_DEMAND);
startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
String name = "queues.queue";
createQueue(0, name, name, null, false, RoutingType.ANYCAST);
createQueue(1, name, name, null, false, RoutingType.ANYCAST);
createQueue(2, name, name, null, false, RoutingType.ANYCAST);
addConsumer(0, 0, name, null);
waitForBindings(0, name, 1, 1, true);
waitForBindings(1, name, 1, 0, true);
waitForBindings(2, name, 1, 0, true);
waitForBindings(0, name, 2, 0, false);
waitForBindings(1, name, 2, 1, false);
waitForBindings(2, name, 2, 1, false);
removeConsumer(0);
Thread.sleep(2000);
send(0, "jms.queue." + name, 20, false, null);
addConsumer(1, 1, name, null);
verifyReceiveAll(20, 1);
verifyNotReceive(1);
}
@Test
public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);