ARTEMIS-1612 Fix message redistribution for prefixed addresses
This commit is contained in:
parent
9171d86abf
commit
f09bde07df
|
@ -892,7 +892,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
// as described on https://issues.jboss.org/browse/JBPAPP-6130
|
||||
Message copyRedistribute = message.copy(storageManager.generateID());
|
||||
|
||||
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddressSimpleString());
|
||||
Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());
|
||||
|
||||
if (bindings != null) {
|
||||
RoutingContext context = new RoutingContextImpl(tx);
|
||||
|
|
|
@ -1243,7 +1243,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
}
|
||||
if (params == null)
|
||||
params = new HashMap<>();
|
||||
return new TransportConfiguration(className, params);
|
||||
return new TransportConfiguration(className, params, UUIDGenerator.getInstance().generateStringUUID(), new HashMap<String, Object>());
|
||||
}
|
||||
|
||||
protected void waitForServerToStart(ActiveMQServer server) throws InterruptedException {
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
|
@ -30,6 +31,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Bindable;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
|
@ -651,6 +653,52 @@ public class MessageRedistributionTest extends ClusterTestBase {
|
|||
verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids0, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedistributionWithPrefixesWhenRemoteConsumerIsAdded() throws Exception {
|
||||
|
||||
for (int i = 0; i <= 2; i++) {
|
||||
ActiveMQServer server = getServer(i);
|
||||
for (TransportConfiguration c : server.getConfiguration().getAcceptorConfigurations()) {
|
||||
c.getExtraParams().putIfAbsent("anycastPrefix", "jms.queue.");
|
||||
}
|
||||
}
|
||||
|
||||
setupCluster(MessageLoadBalancingType.ON_DEMAND);
|
||||
|
||||
startServers(0, 1, 2);
|
||||
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
setupSessionFactory(2, isNetty());
|
||||
|
||||
String name = "queues.queue";
|
||||
|
||||
createQueue(0, name, name, null, false, RoutingType.ANYCAST);
|
||||
createQueue(1, name, name, null, false, RoutingType.ANYCAST);
|
||||
createQueue(2, name, name, null, false, RoutingType.ANYCAST);
|
||||
|
||||
addConsumer(0, 0, name, null);
|
||||
|
||||
waitForBindings(0, name, 1, 1, true);
|
||||
waitForBindings(1, name, 1, 0, true);
|
||||
waitForBindings(2, name, 1, 0, true);
|
||||
|
||||
waitForBindings(0, name, 2, 0, false);
|
||||
waitForBindings(1, name, 2, 1, false);
|
||||
waitForBindings(2, name, 2, 1, false);
|
||||
|
||||
removeConsumer(0);
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
send(0, "jms.queue." + name, 20, false, null);
|
||||
|
||||
addConsumer(1, 1, name, null);
|
||||
|
||||
verifyReceiveAll(20, 1);
|
||||
verifyNotReceive(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception {
|
||||
setupCluster(MessageLoadBalancingType.ON_DEMAND);
|
||||
|
|
Loading…
Reference in New Issue