ARTEMIS-3295 - do not cluster advisories

https://issues.apache.org/jira/browse/ARTEMIS-3295
This commit is contained in:
Andy Taylor 2021-05-13 12:08:33 +01:00 committed by clebertsuconic
parent a3de3d4c75
commit a34d9aad6b
4 changed files with 79 additions and 0 deletions

View File

@ -177,6 +177,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
if (cc != null) {
cc.addClusterTopologyListener(this);
}
//make sure we don't cluster advisories
clusterManager.addProtocolIgnoredAddress(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
}
@Override

View File

@ -101,6 +101,8 @@ public final class ClusterManager implements ActiveMQComponent {
private final Configuration configuration;
private Set<String> protocolIgnoredAddresses = new HashSet<>();
public QuorumManager getQuorumManager() {
return clusterController.getQuorumManager();
}
@ -113,6 +115,7 @@ public final class ClusterManager implements ActiveMQComponent {
return haManager;
}
public void addClusterChannelHandler(Channel channel,
Acceptor acceptorUsed,
CoreRemotingConnection remotingConnection,
@ -337,6 +340,8 @@ public final class ClusterManager implements ActiveMQComponent {
state = State.STOPPED;
clearClusterConnections();
protocolIgnoredAddresses.clear();
}
public void flushExecutor() {
@ -574,6 +579,15 @@ public final class ClusterManager implements ActiveMQComponent {
}
}
public ClusterManager addProtocolIgnoredAddress(String ignoredAddress) {
protocolIgnoredAddresses.add(ignoredAddress);
return this;
}
public Collection<String> getProtocolIgnoredAddresses() {
return protocolIgnoredAddresses;
}
// Private methods ----------------------------------------------------------------------------------------------------
private void clearClusterConnections() {

View File

@ -75,6 +75,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
private final SimpleString managementNotificationAddress;
private ClientConsumer notifConsumer;
private final SimpleString idsHeaderName;
@ -86,6 +87,7 @@ public class ClusterConnectionBridge extends BridgeImpl {
private final ServerLocatorInternal discoveryLocator;
private final String storeAndForwardPrefix;
private TopologyMemberImpl member;
public ClusterConnectionBridge(final ClusterConnection clusterConnection,
@ -362,6 +364,10 @@ public class ClusterConnectionBridge extends BridgeImpl {
}
filterString += "!" + storeAndForwardPrefix;
filterString += ",!" + managementAddress;
//add any protocol specific ignored addresses
for (String ignoreAddress : clusterManager.getProtocolIgnoredAddresses()) {
filterString += ",!" + ignoreAddress;
}
return filterString;
}

View File

@ -28,11 +28,16 @@ import org.apache.activemq.artemis.tests.integration.cluster.distribution.Cluste
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
import org.junit.Assert;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -71,6 +76,57 @@ public class MessageRedistributionTest extends ClusterTestBase {
}
}
@Test
public void testAdvisoriesNotClustered() throws Exception {
setupServer(0, true, true);
setupServer(1, true, true);
setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 0, 1);
setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, true, 1, 0);
startServers(0, 1);
waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);
setupSessionFactory(0, true);
setupSessionFactory(1, true);
createAddressInfo(0, "testAddress", RoutingType.MULTICAST, -1, false);
createAddressInfo(1, "testAddress", RoutingType.MULTICAST, -1, false);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
ActiveMQConnectionFactory factory2 = new ActiveMQConnectionFactory(getServerUri(1));
Connection conn = null;
Connection conn2 = null;
CountDownLatch active = new CountDownLatch(1);
try {
conn = factory.createConnection();
conn2 = factory2.createConnection();
conn2.setClientID("id");
conn2.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic dest = (Topic) ActiveMQDestination.createDestination("testAddress", ActiveMQDestination.TOPIC_TYPE);
TopicSubscriber mySubscriber = session2.createDurableSubscriber(dest, "mySubscriber");
MessageProducer producer = session.createProducer(dest);
producer.send(session.createTextMessage("test message"));
Message message = mySubscriber.receive(5000);
SimpleString advQueue = new SimpleString("ActiveMQ.Advisory.TempQueue");
SimpleString advTopic = new SimpleString("ActiveMQ.Advisory.TempTopic");
//we create a consumer on node 2 and assert that the advisory subscription queue is not clustered
Assert.assertEquals("", 1, servers[0].getPostOffice().getBindingsForAddress(advQueue).getBindings().size());
Assert.assertEquals("", 1, servers[0].getPostOffice().getBindingsForAddress(advTopic).getBindings().size());
Assert.assertEquals("", 1, servers[1].getPostOffice().getBindingsForAddress(advQueue).getBindings().size());
Assert.assertEquals("", 1, servers[1].getPostOffice().getBindingsForAddress(advTopic).getBindings().size());
} finally {
conn.close();
conn2.close();
}
}
@Override
protected boolean isResolveProtocols() {
return true;