ARTEMIS-4498 Making queues always manageable

This commit is contained in:
Clebert Suconic 2024-04-09 16:26:48 -04:00 committed by clebertsuconic
parent 7702b39374
commit 162c4f6655
10 changed files with 14 additions and 35 deletions

View File

@ -509,7 +509,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
try {
server.registerQueueOnManagement(mirrorControlQueue, true);
server.registerQueueOnManagement(mirrorControlQueue);
} catch (Throwable ignored) {
logger.debug(ignored.getMessage(), ignored);
}

View File

@ -92,6 +92,8 @@ public class QueueFilterPredicate extends ActiveMQFilterPredicate<QueueControl>
return matches(queue.getScheduledCount());
case USER:
return matches(queue.getUser());
case INTERNAL_QUEUE:
return matches(queue.isInternalQueue());
default:
return true;
}

View File

@ -989,7 +989,7 @@ public interface ActiveMQServer extends ServiceComponent {
*/
void autoRemoveAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
void registerQueueOnManagement(Queue queue, boolean registerInternal) throws Exception;
void registerQueueOnManagement(Queue queue) throws Exception;
/**
* Remove an {@code AddressInfo} from the broker.

View File

@ -4000,8 +4000,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
/** Register a queue on the management registry */
@Override
public void registerQueueOnManagement(Queue queue, boolean registerInternal) throws Exception {
managementService.registerQueue(queue, queue.getAddress(), storageManager, registerInternal);
public void registerQueueOnManagement(Queue queue) throws Exception {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
@Override

View File

@ -101,8 +101,6 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void registerQueue(Queue queue, SimpleString address, StorageManager storageManager) throws Exception;
void registerQueue(Queue queue, SimpleString address, StorageManager storageManager, boolean forceInternal) throws Exception;
void unregisterQueue(SimpleString name, SimpleString address, RoutingType routingType) throws Exception;
void registerAcceptor(Acceptor acceptor, TransportConfiguration configuration) throws Exception;

View File

@ -298,18 +298,6 @@ public class ManagementServiceImpl implements ManagementService {
public synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager) throws Exception {
registerQueue(queue, addressInfo, storageManager, false);
}
private synchronized void registerQueue(final Queue queue,
final AddressInfo addressInfo,
final StorageManager storageManager,
boolean forceInternal) throws Exception {
if (!forceInternal && (addressInfo.isInternal() || queue.isInternalQueue())) {
logger.debug("won't register internal queue: {}", queue);
return;
}
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), messagingServer, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
@ -332,14 +320,6 @@ public class ManagementServiceImpl implements ManagementService {
registerQueue(queue, new AddressInfo(address), storageManager);
}
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final StorageManager storageManager,
final boolean forceInternal) throws Exception {
registerQueue(queue, new AddressInfo(address), storageManager, forceInternal);
}
@Override
public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);

View File

@ -269,11 +269,6 @@ public class ClusteredResetMockTest extends ServerTestBase {
}
@Override
public void registerQueue(Queue queue, SimpleString address, StorageManager storageManager, boolean forceInternal) throws Exception {
}
@Override
public void unregisterQueue(SimpleString name, SimpleString address, RoutingType routingType) throws Exception {

View File

@ -30,6 +30,10 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
Object[] queueControls = server.getJMSServerManager().getActiveMQServer().getManagementService().getResources(QueueControl.class);
for (Object o : queueControls) {
QueueControl c = (QueueControl) o;
if (c.isInternalQueue()) {
continue;
}
GroovyRun.assertTrue(c.getPersistentSize() > 0);
GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
GroovyRun.assertEquals(33l, c.getMessageCount());

View File

@ -3479,7 +3479,7 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertTrue(array.getJsonObject(1).getString("name").contains("my_queue"));
//test with an empty filter
filterString = createJsonFilter("", "", "");
filterString = createJsonFilter("internalQueue", "NOT_CONTAINS", "true");
queuesAsJsonString = serverControl.listQueues(filterString, 1, 50);
@ -3727,8 +3727,8 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
queuesAsJsonObject = JsonUtil.readJsonObject(queuesAsJsonString);
array = (JsonArray) queuesAsJsonObject.get("data");
Assert.assertEquals("number of queues returned from LESS_THAN query", 1, array.size());
Assert.assertEquals("correct queue returned from query", queueName4.toString(), array.getJsonObject(0).getString("name"));
Assert.assertEquals("number of queues returned from LESS_THAN query", 2, array.size());
Assert.assertEquals("correct queue returned from query", queueName4.toString(), array.getJsonObject(1).getString("name"));
//test with GREATER_THAN returns 2 queue
filterString = createJsonFilter("CONSUMER_COUNT", "GREATER_THAN", "2");

View File

@ -69,7 +69,7 @@ public class QueueConfigPersistenceTest extends ActiveMQTestBase {
server.start();
Queue queue = server.locateQueue(getName());
Assert.assertTrue(queue.isInternalQueue());
Assert.assertNull(server.getManagementService().getResource(ResourceNames.QUEUE + getName()));
Assert.assertNotNull(server.getManagementService().getResource(ResourceNames.QUEUE + getName()));
server.stop();
}