ARTEMIS-4560 Fixing defaults on Broker Connections for Broker Properties

ARTEMIS-4566 Allow management of Mirror SNF internal queue
This commit is contained in:
Clebert Suconic 2024-01-08 23:04:01 -05:00 committed by clebertsuconic
parent f990c9a597
commit 5e7a9023d8
13 changed files with 367 additions and 6 deletions

View File

@ -55,7 +55,7 @@ public class ServerUtil {
final Process process = internalStartServer(artemisInstance, serverName, brokerProperties);
// wait for start
if (timeout != 0) {
if (timeout > 0) {
waitForServerToStart(id, timeout);
}

View File

@ -501,6 +501,8 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
mirrorControlQueue = server.createQueue(new QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true), true);
}
server.registerQueueOnManagement(mirrorControlQueue, true);
logger.debug("Mirror queue {}", mirrorControlQueue.getName());
mirrorControlQueue.setMirrorController(true);

View File

@ -23,7 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
public class AMQPMirrorBrokerConnectionElement extends AMQPBrokerConnectionElement {
boolean durable;
boolean durable = true;
boolean queueCreation = true;

View File

@ -973,6 +973,8 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
void registerQueueOnManagement(Queue queue, boolean registerInternal) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.
*

View File

@ -3944,6 +3944,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
removeAddressInfo(address, auth);
}
/** Register a queue on the management registry */
@Override
public void registerQueueOnManagement(Queue queue, boolean registerInternal) throws Exception {
managementService.registerQueue(queue, queue.getAddress(), storageManager, registerInternal);
}
@Override
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth, boolean force) throws Exception {
if (auth != null) {

View File

@ -100,6 +100,8 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void registerQueue(Queue queue, SimpleString address, StorageManager storageManager) throws Exception;
void registerQueue(Queue queue, SimpleString address, StorageManager storageManager, boolean forceInternal) throws Exception;
void unregisterQueue(SimpleString name, SimpleString address, RoutingType routingType) throws Exception;
void registerAcceptor(Acceptor acceptor, TransportConfiguration configuration) throws Exception;

View File

@ -287,8 +287,15 @@ public class ManagementServiceImpl implements ManagementService {
public synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager) throws Exception {
registerQueue(queue, addressInfo, storageManager, false);
}
if (addressInfo.isInternal() || queue.isInternalQueue()) {
private synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager,
boolean forceInternal) throws Exception {
if (!forceInternal && (addressInfo.isInternal() || queue.isInternalQueue())) {
logger.debug("won't register internal queue: {}", queue);
return;
}
@ -314,6 +321,14 @@ public class ManagementServiceImpl implements ManagementService {
registerQueue(queue, new AddressInfo(address), storageManager);
}
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final StorageManager storageManager,
final boolean forceInternal) throws Exception {
registerQueue(queue, new AddressInfo(address), storageManager, forceInternal);
}
@Override
public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);

View File

@ -268,6 +268,11 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
}
@Override
public void registerQueue(Queue queue, SimpleString address, StorageManager storageManager, boolean forceInternal) throws Exception {
}
@Override
public void unregisterQueue(SimpleString name, SimpleString address, RoutingType routingType) throws Exception {

View File

@ -76,5 +76,18 @@ public class HelperBase {
return this;
}
public HelperBase addArgs(String... args) {
int initialLength = this.args == null ? 0 : this.args.length;
String[] newArgs = new String[initialLength + args.length];
for (int i = 0; i < initialLength; i++) {
newArgs[i] = this.args[i];
}
for (int i = 0; i < args.length; i++) {
newArgs[i + initialLength] = args[i];
}
this.args = newArgs;
return this;
}
String[] args = new String[0];
}

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
@ -94,6 +95,11 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
}
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false);
@ -579,6 +585,9 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
Assert.assertTrue(server_2.locateQueue(replica.getMirrorSNF()).getMessagesAdded() > 0);
}
SimpleManagement simpleManagement = new SimpleManagement("tcp://localhost:" + AMQP_PORT_2, null, null);
Wait.assertEquals(0, () -> simpleManagement.getMessageCountOnQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror-source"), 5000);
try (Connection connection = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT).createConnection()) {
connection.start();

View File

@ -691,7 +691,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null, 5000);
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();

View File

@ -68,14 +68,14 @@ public class PagedMirrorTest extends ActiveMQTestBase {
server1.getConfiguration().getAcceptorConfigurations().clear();
server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616");
AMQPBrokerConnectConfiguration brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(false));
server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 * 1024, -1, -1);
server2.getConfiguration().getAcceptorConfigurations().clear();
server2.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61617");
brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", "tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement());
brokerConnectConfiguration.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(false));
server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
server1.start();

View File

@ -0,0 +1,307 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MirroredTopicSoakTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String largeBody;
private static String smallBody = "This is a small body";
static {
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 1024 * 1024) {
writer.append("This is a large string ..... ");
}
largeBody = writer.toString();
}
public static final String DC1_NODE_A = "mirror/DC1/A";
public static final String DC2_NODE_A = "mirror/DC2/A";
public static final String DC1_NODE_B = "mirror/DC1/B";
public static final String DC2_NODE_B = "mirror/DC2/B";
Process processDC1_node_A;
Process processDC1_node_B;
Process processDC2_node_A;
Process processDC2_node_B;
private static String DC1_NODEA_URI = "tcp://localhost:61616";
private static String DC1_NODEB_URI = "tcp://localhost:61617";
private static String DC2_NODEA_URI = "tcp://localhost:61618";
private static String DC2_NODEB_URI = "tcp://localhost:61619";
private static void createServer(String serverName, String connectionName, String clusterURI, String mirrorURI, int porOffset) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(true);
cliCreateServer.setNoWeb(true);
cliCreateServer.setStaticCluster(clusterURI);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE_A);
cliCreateServer.addArgs("--addresses", "order");
cliCreateServer.addArgs("--queues", "myQueue");
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
}
@BeforeClass
public static void createServers() throws Exception {
createServer(DC1_NODE_A, "mirror", DC1_NODEB_URI, DC2_NODEA_URI, 0);
createServer(DC1_NODE_B, "mirror", DC1_NODEA_URI, DC2_NODEB_URI, 1);
createServer(DC2_NODE_A, "mirror", DC2_NODEB_URI, DC1_NODEA_URI, 2);
createServer(DC2_NODE_B, "mirror", DC2_NODEA_URI, DC1_NODEB_URI, 3);
}
private void startServers() throws Exception {
processDC1_node_A = startServer(DC1_NODE_A, -1, -1, new File(getServerLocation(DC1_NODE_A), "broker.properties"));
processDC1_node_B = startServer(DC1_NODE_B, -1, -1, new File(getServerLocation(DC1_NODE_B), "broker.properties"));
processDC2_node_A = startServer(DC2_NODE_A, -1, -1, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
processDC2_node_B = startServer(DC2_NODE_B, -1, -1, new File(getServerLocation(DC2_NODE_B), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
ServerUtil.waitForServerToStart(1, 10_000);
ServerUtil.waitForServerToStart(2, 10_000);
ServerUtil.waitForServerToStart(3, 10_000);
}
@Test
public void testQueue() throws Exception {
startServers();
final int numberOfMessages = 200;
Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0);
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_NODEA_URI);
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", DC2_NODEA_URI);
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
SimpleManagement simpleManagementDC1A = new SimpleManagement(DC1_NODEA_URI, null, null);
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message;
boolean large;
if (i % 1 == 2) {
message = session.createTextMessage(largeBody);
large = true;
} else {
message = session.createTextMessage(smallBody);
large = false;
}
message.setIntProperty("i", i);
message.setBooleanProperty("large", large);
producer.send(message);
if (i % 100 == 0) {
logger.debug("commit {}", i);
session.commit();
}
}
session.commit();
}
logger.debug("All messages were sent");
try (Connection connection = connectionFactoryDC1A.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numberOfMessages / 2; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
}
session.commit();
}
processDC2_node_A.destroyForcibly();
processDC2_node_A.waitFor();
processDC2_node_A = startServer(DC2_NODE_A, 2, 5000, new File(getServerLocation(DC2_NODE_A), "broker.properties"));
Wait.assertEquals(0L, () -> getCount(simpleManagementDC1A, snfQueue), 250_000, 1000);
try (Connection connection = connectionFactoryDC2A.createConnection()) {
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numberOfMessages / 2; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
}
session.commit();
}
}
@Test
public void testMirroredTopics() throws Exception {
startServers();
final int numberOfMessages = 200;
Assert.assertTrue("numberOfMessages must be even", numberOfMessages % 2 == 0);
String clientIDA = "nodeA";
String clientIDB = "nodeB";
String subscriptionID = "my-order";
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
ConnectionFactory connectionFactoryDC1B = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617");
ConnectionFactory connectionFactoryDC2A = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61618");
ConnectionFactory connectionFactoryDC2B = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61619");
SimpleManagement simpleManagementDC1B = new SimpleManagement(DC1_NODEB_URI, null, null);
SimpleManagement simpleManagementDC2B = new SimpleManagement(DC2_NODEB_URI, null, null);
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, 0, false);
try (Connection connection = connectionFactoryDC1B.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("order");
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < numberOfMessages; i++) {
TextMessage message;
boolean large;
message = session.createTextMessage(largeBody);
large = true;
message.setIntProperty("i", i);
message.setBooleanProperty("large", large);
producer.send(message);
if (i % 100 == 0) {
logger.debug("commit {}", i);
session.commit();
}
}
session.commit();
}
logger.debug("Consuming from DC1B");
consume(connectionFactoryDC1B, clientIDB, subscriptionID, 0, numberOfMessages / 2, false);
processDC2_node_B.destroyForcibly();
processDC2_node_B.waitFor();
processDC2_node_B = startServer(DC2_NODE_B, 3, 5000, new File(getServerLocation(DC2_NODE_B), "broker.properties"));
Wait.assertEquals(0L, () -> getCount(simpleManagementDC1B, snfQueue), 250_000, 1000);
Wait.assertEquals(numberOfMessages / 2, () -> simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"), 10000);
logger.debug("Consuming from DC2B with {}", simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"));
consume(connectionFactoryDC2B, clientIDB, subscriptionID, numberOfMessages / 2, numberOfMessages / 2, true);
Wait.assertEquals(0, () -> simpleManagementDC2B.getMessageCountOnQueue("nodeB.my-order"), 10000);
Wait.assertEquals(0, () -> simpleManagementDC1B.getMessageCountOnQueue("nodeB.my-order"), 10000);
consume(connectionFactoryDC1B, clientIDB, subscriptionID, numberOfMessages, 0, true);
logger.debug("DC1B nodeB.my-order=0");
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
}
private static void consume(ConnectionFactory factory, String clientID, String subscriptionID, int start, int numberOfMessages, boolean expectEmpty) throws Exception {
try (Connection connection = factory.createConnection()) {
connection.setClientID(clientID);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("order");
connection.start();
MessageConsumer consumer = session.createDurableConsumer(topic, subscriptionID);
boolean failed = false;
for (int i = start; i < start + numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(10_000);
Assert.assertNotNull(message);
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
if (message.getIntProperty("i") != i) {
failed = true;
logger.warn("Expected message {} but got {}", i, message.getIntProperty("i"));
}
if (message.getBooleanProperty("large")) {
Assert.assertEquals(largeBody, message.getText());
} else {
Assert.assertEquals(smallBody, message.getText());
}
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
}
session.commit();
Assert.assertFalse(failed);
if (expectEmpty) {
Assert.assertNull(consumer.receiveNoWait());
}
}
}
}