This closes #1879
This commit is contained in:
commit
120bc1ed4c
|
@ -1578,6 +1578,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
@Message(id = 222269, value = "Please use a fixed value for \"journal-pool-files\". Default changed per https://issues.apache.org/jira/browse/ARTEMIS-1628", format = Message.Format.MESSAGE_FORMAT)
|
||||
void useFixedValueOnJournalPoolFiles();
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222270, value = "Unable to create management notification address: {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
void unableToCreateManagementNotificationAddress(SimpleString addressName, @Cause Exception e);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||
void initializationError(@Cause Throwable e);
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
|||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.security.SecurityStore;
|
||||
import org.apache.activemq.artemis.core.server.ActivateCallback;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
@ -540,6 +541,33 @@ public class ManagementServiceImpl implements ManagementService {
|
|||
}
|
||||
|
||||
started = true;
|
||||
|
||||
/**
|
||||
* Ensure the management notification address is created otherwise if auto-create-address = false then cluster
|
||||
* bridges won't be able to connect.
|
||||
*/
|
||||
messagingServer.registerActivateCallback(new ActivateCallback() {
|
||||
@Override
|
||||
public void preActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activated() {
|
||||
try {
|
||||
messagingServer.addAddressInfo(new AddressInfo(managementNotificationAddress, RoutingType.MULTICAST));
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToCreateManagementNotificationAddress(managementNotificationAddress, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deActivate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void activationComplete() {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,7 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.Before;
|
||||
|
@ -229,6 +233,44 @@ public class SymmetricClusterTest extends ClusterTestBase {
|
|||
verifyNotReceive(0, 1, 2, 3, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicRoundRobinManyMessagesNoAddressAutoCreate() throws Exception {
|
||||
setupCluster();
|
||||
|
||||
startServers();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
servers[i].getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
setupSessionFactory(i, isNetty());
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
servers[i].addAddressInfo(new AddressInfo(SimpleString.toSimpleString("queues.testaddress"), RoutingType.MULTICAST));
|
||||
createQueue(i, "queues.testaddress", "queue0", null, false);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
addConsumer(i, i, "queue0", null);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
waitForBindings(i, "queues.testaddress", 1, 1, true);
|
||||
}
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
waitForBindings(i, "queues.testaddress", 4, 4, false);
|
||||
}
|
||||
|
||||
send(0, "queues.testaddress", 1000, true, null);
|
||||
|
||||
verifyReceiveRoundRobinInSomeOrder(1000, 0, 1, 2, 3, 4);
|
||||
|
||||
verifyNotReceive(0, 1, 2, 3, 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoundRobinMultipleQueues() throws Exception {
|
||||
SymmetricClusterTest.log.info("starting");
|
||||
|
|
Loading…
Reference in New Issue