ARTEMIS-4498 Expose internal queues for management and observability

This commit is contained in:
a181321 2024-03-15 11:15:07 -04:00 committed by clebertsuconic
parent 6e3c7e055d
commit 51f39fc34d
13 changed files with 116 additions and 203 deletions

View File

@ -127,6 +127,12 @@ public interface QueueControl {
@Attribute(desc = DURABLE_PERSISTENT_SIZE_DESCRIPTION)
long getDurablePersistentSize();
/**
* Returns whether this queue was created for the broker's internal use.
*/
@Attribute(desc = "whether this queue was created for the broker's internal use")
boolean isInternalQueue();
/**
* Returns the number of scheduled messages in this queue.
*/

View File

@ -104,7 +104,8 @@ var Artemis;
{name: "Ring Size", visible: false},
{name: "Consumers Before Dispatch", visible: false},
{name: "Delay Before Dispatch", visible: false},
{name: "Auto Delete", visible: false}
{name: "Auto Delete", visible: false},
{name: "Internal", visible: false}
]
};
@ -142,7 +143,8 @@ var Artemis;
{id: 'paused', name: 'Paused'},
{id: 'temporary', name: 'Temporary'},
{id: 'autoCreated', name: 'Auto Created'},
{id: 'autoDelete', name: 'Auto Delete'}
{id: 'autoDelete', name: 'Auto Delete'},
{id: 'internalQueue', name: 'Internal'}
],
operationOptions: [
{id: 'EQUALS', name: 'Equals'},
@ -218,7 +220,8 @@ var Artemis;
{ header: 'Ring Size', itemField: 'ringSize'},
{ header: 'Consumers Before Dispatch', itemField: 'consumersBeforeDispatch'},
{ header: 'Delay Before Dispatch', itemField: 'delayBeforeDispatch'},
{ header: 'Auto Delete', itemField: 'autoDelete'}
{ header: 'Auto Delete', itemField: 'autoDelete'},
{ header: 'Internal', itemField: 'internalQueue'}
];
ctrl.refresh = function () {

View File

@ -1054,6 +1054,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
return value == null ? 0 : value;
}
@Override
public boolean isInternalQueue() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isInternal(queue);
}
checkStarted();
clearIO();
try {
return queue.isInternalQueue();
} finally {
blockOnIO();
}
}
@Override
public String countMessages(final String filterStr, final String groupByProperty) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {

View File

@ -53,7 +53,8 @@ public enum QueueField {
RING_SIZE("ringSize"),
CONSUMERS_BEFORE_DISPATCH("consumersBeforeDispatch"),
DELAY_BEFORE_DISPATCH("delayBeforeDispatch"),
AUTO_DELETE("autoDelete");
AUTO_DELETE("autoDelete"),
INTERNAL_QUEUE("internalQueue");
private static final Map<String, QueueField> lookup = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);

View File

@ -77,7 +77,8 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
.add(QueueField.RING_SIZE.getName(), toString(queue.getRingSize()))
.add(QueueField.CONSUMERS_BEFORE_DISPATCH.getName(), toString(queue.getConsumersBeforeDispatch()))
.add(QueueField.DELAY_BEFORE_DISPATCH.getName(), toString(queue.getDelayBeforeDispatch()))
.add(QueueField.AUTO_DELETE.getName(), toString(q.isAutoDelete()));
.add(QueueField.AUTO_DELETE.getName(), toString(q.isAutoDelete()))
.add(QueueField.INTERNAL_QUEUE.getName(), toString(q.isInternalQueue()));
return obj;
}
@ -152,6 +153,8 @@ public class QueueView extends ActiveMQAbstractView<QueueControl> {
return q.getConsumersBeforeDispatch();
case DELAY_BEFORE_DISPATCH:
return q.getDelayBeforeDispatch();
case INTERNAL_QUEUE:
return q.isInternalQueue();
default:
throw new IllegalArgumentException("Unsupported field, " + fieldName);
}

View File

@ -555,9 +555,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
// only register address if it is new
if (result) {
try {
if (!addressInfo.isInternal()) {
managementService.registerAddress(addressInfo);
}
managementService.registerAddress(addressInfo);
if (server.hasBrokerAddressPlugins()) {
server.callBrokerAddressPlugins(plugin -> plugin.afterAddAddress(addressInfo, reload));
}

View File

@ -4191,9 +4191,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw e;
}
if (!queueConfiguration.isInternal()) {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
managementService.registerQueue(queue, queue.getAddress(), storageManager);
copyRetroactiveMessages(queue);

View File

@ -427,6 +427,8 @@ public class AddressInfo {
} else if (key.equals("created-timestamp")) {
JsonNumber jsonLong = (JsonNumber) value;
this.createdTimestamp = jsonLong.longValue();
} else if (key.equals("internal")) {
this.internal = Boolean.valueOf(value.toString());
}
}

View File

@ -2974,7 +2974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
/**
* @return the internalQueue
* @return if queue is internal
*/
@Override
public boolean isInternalQueue() {

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -27,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -35,6 +37,7 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Ignore;
@ -363,6 +366,55 @@ public class SimpleSymmetricClusterTest extends ClusterTestBase {
}
@Test
public void testSimpleSnFManagement() throws Exception {
final String address = "queues.testaddress";
final String queue = "queue0";
setupServer(0, false, isNetty());
setupServer(1, false, isNetty());
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
createQueue(0, address, queue, null, false);
createQueue(1, address, queue, null, false);
addConsumer(0, 0, queue, null);
addConsumer(1, 1, queue, null);
waitForBindings(0, address, 1, 1, true);
waitForBindings(1, address, 1, 1, true);
waitForBindings(0, address, 1, 1, false);
waitForBindings(1, address, 1, 1, false);
SimpleString SnFQueueName = SimpleString.toSimpleString(
Arrays.stream(servers[0].getActiveMQServerControl().getQueueNames()).filter(
queueName -> queueName.contains(servers[0].getInternalNamingPrefix()))
.findFirst()
.orElse(null));
Assert.assertNotNull(SnFQueueName);
QueueControl queueControl = ManagementControlHelper.createQueueControl(SnFQueueName, SnFQueueName, RoutingType.MULTICAST, servers[0].getMBeanServer());
//check that internal queue can be managed
queueControl.pause();
Assert.assertTrue(queueControl.isPaused());
queueControl.resume();
Assert.assertFalse(queueControl.isPaused());
closeAllConsumers();
}
@Test
public void testSimple2() throws Exception {
setupServer(0, true, isNetty());

View File

@ -278,6 +278,26 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testRegisterInternalQueues() throws Exception {
SimpleString queue = RandomUtil.randomSimpleString();
server.createQueue(new QueueConfiguration(queue).setDurable(durable).setInternal(true));
QueueControl queueControl = createManagementControl(queue, queue);
Assert.assertNotNull(queueControl);
Assert.assertTrue(server.locateQueue(queue).isInternalQueue());
Assert.assertEquals(queue.toString(), queueControl.getName());
Assert.assertEquals(durable, queueControl.isDurable());
//check that internal queue can be managed
queueControl.pause();
Assert.assertTrue(queueControl.isPaused());
queueControl.resume();
Assert.assertFalse(queueControl.isPaused());
}
@Test
public void testAutoDeleteAttribute() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();

View File

@ -252,6 +252,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (String) proxy.retrieveAttributeValue("lastValueKey");
}
@Override
public boolean isInternalQueue() {
return (boolean) proxy.retrieveAttributeValue("internalQueue");
}
@Override
public int getConsumersBeforeDispatch() {
return (Integer) proxy.retrieveAttributeValue("consumersBeforeDispatch");

View File

@ -1,191 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire.management;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ProducerEventSource;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class OpenWireManagementTest extends OpenWireTestBase {
private ActiveMQServerControl serverControl;
private SimpleString queueName1 = new SimpleString("queue1");
private SimpleString queueName2 = new SimpleString("queue2");
private SimpleString queueName3 = new SimpleString("queue3");
private ConnectionFactory factory;
@Parameterized.Parameters(name = "useDefault={0},supportAdvisory={1},suppressJmx={2}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{true, false, false},
{false, true, false},
{false, true, true},
{false, false, false},
{false, false, true}
});
}
private boolean useDefault;
private boolean supportAdvisory;
private boolean suppressJmx;
public OpenWireManagementTest(boolean useDefault, boolean supportAdvisory, boolean suppressJmx) {
this.useDefault = useDefault;
this.supportAdvisory = supportAdvisory;
this.suppressJmx = suppressJmx;
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
serverControl = (ActiveMQServerControl) ManagementControlHelper.createProxy(ObjectNameBuilder.DEFAULT.getActiveMQServerObjectName(), ActiveMQServerControl.class, mbeanServer);
factory = new ActiveMQConnectionFactory(urlString);
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
serverConfig.setJMXManagementEnabled(true);
if (useDefault) {
//don't set parameters explicitly
return;
}
Set<TransportConfiguration> acceptorConfigs = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tconfig : acceptorConfigs) {
if ("netty".equals(tconfig.getName())) {
Map<String, Object> params = tconfig.getExtraParams();
params.put("supportAdvisory", supportAdvisory);
params.put("suppressInternalManagementObjects", suppressJmx);
}
}
}
@Test
public void testHiddenInternalAddress() throws Exception {
server.createQueue(new QueueConfiguration(queueName1).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
server.createQueue(new QueueConfiguration(queueName2).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
server.createQueue(new QueueConfiguration(queueName3).setRoutingType(RoutingType.ANYCAST).setAutoCreateAddress(true));
String[] addresses = serverControl.getAddressNames();
assertEquals(4, addresses.length);
for (String addr : addresses) {
assertFalse(addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX));
}
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName1.toString());
ConsumerEventSource consumerEventSource = new ConsumerEventSource(connection, destination);
consumerEventSource.setConsumerListener(consumerEvent -> {
});
consumerEventSource.start();
ProducerEventSource producerEventSource = new ProducerEventSource(connection, destination);
producerEventSource.setProducerListener(producerEvent -> {
});
producerEventSource.start();
//after that point several advisory addresses are created.
//make sure they are not accessible via management api.
addresses = serverControl.getAddressNames();
boolean hasInternalAddress = false;
for (String addr : addresses) {
hasInternalAddress = addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
if (hasInternalAddress) {
break;
}
}
assertEquals(!useDefault && supportAdvisory && !suppressJmx, hasInternalAddress);
consumerEventSource.stop();
producerEventSource.stop();
}
}
@Test
public void testHiddenInternalQueue() throws Exception {
server.createQueue(new QueueConfiguration(queueName1).setRoutingType(RoutingType.ANYCAST));
String[] queues = serverControl.getQueueNames();
assertEquals(1, queues.length);
for (String queue : queues) {
assertFalse(checkQueueFromInternalAddress(queue));
}
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName1.toString());
//this causes advisory queues to be created
session.createProducer(destination);
queues = serverControl.getQueueNames();
boolean hasInternal = false;
String targetQueue = null;
for (String queue : queues) {
hasInternal = checkQueueFromInternalAddress(queue);
if (hasInternal) {
targetQueue = queue;
break;
}
}
assertEquals("targetQueue: " + targetQueue, !useDefault && supportAdvisory && !suppressJmx, hasInternal);
}
}
private boolean checkQueueFromInternalAddress(String queue) throws JMSException, ActiveMQException {
try (Connection coreConn = coreCf.createConnection()) {
ActiveMQSession session = (ActiveMQSession) coreConn.createSession();
ClientSession coreSession = session.getCoreSession();
ClientSession.QueueQuery query = coreSession.queueQuery(new SimpleString(queue));
assertTrue("Queue doesn't exist: " + queue, query.isExists());
SimpleString qAddr = query.getAddress();
return qAddr.toString().startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
}
}
}