ARTEMIS-1688 fix cluster when auto-create-addresses=false
This commit is contained in:
parent
3c7d57c9cb
commit
ab602351a1
|
@ -1578,6 +1578,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
@Message(id = 222269, value = "Please use a fixed value for \"journal-pool-files\". Default changed per https://issues.apache.org/jira/browse/ARTEMIS-1628", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 222269, value = "Please use a fixed value for \"journal-pool-files\". Default changed per https://issues.apache.org/jira/browse/ARTEMIS-1628", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void useFixedValueOnJournalPoolFiles();
|
void useFixedValueOnJournalPoolFiles();
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
|
@Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.ERROR)
|
@LogMessage(level = Logger.Level.ERROR)
|
||||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||||
void initializationError(@Cause Throwable e);
|
void initializationError(@Cause Throwable e);
|
||||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.security.SecurityStore;
|
import org.apache.activemq.artemis.core.security.SecurityStore;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
@ -540,6 +541,33 @@ public class ManagementServiceImpl implements ManagementService {
|
||||||
}
|
}
|
||||||
|
|
||||||
started = true;
|
started = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure the management notification address is created otherwise if auto-create-address = false then cluster
|
||||||
|
* bridges won't be able to connect.
|
||||||
|
*/
|
||||||
|
messagingServer.registerActivateCallback(new ActivateCallback() {
|
||||||
|
@Override
|
||||||
|
public void preActivate() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void activated() {
|
||||||
|
try {
|
||||||
|
messagingServer.addAddressInfo(new AddressInfo(managementNotificationAddress, RoutingType.MULTICAST));
|
||||||
|
} catch (Exception e) {
|
||||||
|
ActiveMQServerLogger.LOGGER.unableToCreateManagementNotificationAddress(managementNotificationAddress, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deActivate() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void activationComplete() {
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -16,7 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -229,6 +233,44 @@ public class SymmetricClusterTest extends ClusterTestBase {
|
||||||
verifyNotReceive(0, 1, 2, 3, 4);
|
verifyNotReceive(0, 1, 2, 3, 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicRoundRobinManyMessagesNoAddressAutoCreate() throws Exception {
|
||||||
|
setupCluster();
|
||||||
|
|
||||||
|
startServers();
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
servers[i].getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
setupSessionFactory(i, isNetty());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
servers[i].addAddressInfo(new AddressInfo(SimpleString.toSimpleString("queues.testaddress"), RoutingType.MULTICAST));
|
||||||
|
createQueue(i, "queues.testaddress", "queue0", null, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
addConsumer(i, i, "queue0", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
waitForBindings(i, "queues.testaddress", 1, 1, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
waitForBindings(i, "queues.testaddress", 4, 4, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
send(0, "queues.testaddress", 1000, true, null);
|
||||||
|
|
||||||
|
verifyReceiveRoundRobinInSomeOrder(1000, 0, 1, 2, 3, 4);
|
||||||
|
|
||||||
|
verifyNotReceive(0, 1, 2, 3, 4);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRoundRobinMultipleQueues() throws Exception {
|
public void testRoundRobinMultipleQueues() throws Exception {
|
||||||
SymmetricClusterTest.log.info("starting");
|
SymmetricClusterTest.log.info("starting");
|
||||||
|
|
Loading…
Reference in New Issue