From 7e06a2b1922f40447cd2eff4f7147e4b7056ae1e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 27 Feb 2018 11:36:51 -0500 Subject: [PATCH 1/4] ARTEMIS-1700 Using IOExecutors for more IO tasks --- .../paging/cursor/impl/PageCursorProviderImpl.java | 4 +++- .../impl/journal/AbstractJournalStorageManager.java | 12 ++++++------ .../impl/journal/JournalStorageManager.java | 4 ++-- .../core/replication/ReplicationEndpoint.java | 2 +- .../ShutdownOnCriticalIOErrorMoveNextTest.java | 2 +- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index cb8f01cc7f..2feb84925a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -533,7 +533,9 @@ public class PageCursorProviderImpl implements PageCursorProvider { cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1)); } - while (!storageManager.waitOnOperations(5000)) { + // we just need to make sure the storage is done.. + // if the thread pool is full, we will just log it once instead of looping + if (!storageManager.waitOnOperations(5000)) { ActiveMQServerLogger.LOGGER.problemCompletingOperations(storageManager.getContext()); } } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 970c926277..6d9df082fa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -155,7 +155,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp protected BatchingIDGenerator idGenerator; - protected final ExecutorFactory ioExecutors; + protected final ExecutorFactory ioExecutorFactory; protected final ScheduledExecutorService scheduledExecutorService; @@ -197,15 +197,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp final CriticalAnalyzer analyzer, final ExecutorFactory executorFactory, final ScheduledExecutorService scheduledExecutorService, - final ExecutorFactory ioExecutors) { - this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutors, null); + final ExecutorFactory ioExecutorFactory) { + this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutorFactory, null); } public AbstractJournalStorageManager(Configuration config, CriticalAnalyzer analyzer, ExecutorFactory executorFactory, ScheduledExecutorService scheduledExecutorService, - ExecutorFactory ioExecutors, + ExecutorFactory ioExecutorFactory, IOCriticalErrorListener criticalErrorListener) { super(analyzer, CRITICAL_PATHS); @@ -213,7 +213,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp this.ioCriticalErrorListener = criticalErrorListener; - this.ioExecutors = ioExecutors; + this.ioExecutorFactory = ioExecutorFactory; this.scheduledExecutorService = scheduledExecutorService; @@ -1519,7 +1519,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp beforeStart(); - singleThreadExecutor = executorFactory.getExecutor(); + singleThreadExecutor = ioExecutorFactory.getExecutor(); bindingsJournal.start(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index c54a3b77b5..498e9d5d46 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -129,7 +129,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); bindingsFF.setDatasync(config.isJournalDatasync()); - Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener); + Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener); bindingsJournal = localBindings; originalBindingsJournal = localBindings; @@ -184,7 +184,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { protected Journal createMessageJournal(Configuration config, IOCriticalErrorListener criticalErrorListener, int fileSize) { - return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener); + return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener); } // Life Cycle Handlers diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index f76f796eac..6e45a8c1af 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -262,7 +262,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING); } - pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository()); + pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getIOExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository()); pageManager.start(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java index ebf7c0a0db..bbca7b8d03 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java @@ -99,7 +99,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase { protected Journal createMessageJournal(Configuration config, IOCriticalErrorListener criticalErrorListener, int fileSize) { - return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) { + return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) { @Override protected void moveNextFile(boolean scheduleReclaim) throws Exception { super.moveNextFile(scheduleReclaim); From bd0c80d47b0d807801c4b5a724d96911749c2b60 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 27 Feb 2018 12:49:46 -0500 Subject: [PATCH 2/4] NO-JIRA Adding information to why Joram tests eventually fail on CI --- .../java/org/objectweb/jtests/jms/framework/PubSubTestCase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java b/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java index d34002441d..a0591721ac 100644 --- a/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java +++ b/tests/joram-tests/src/test/java/org/objectweb/jtests/jms/framework/PubSubTestCase.java @@ -149,6 +149,7 @@ public abstract class PubSubTestCase extends JMSTestCase { admin.deleteTopicConnectionFactory(PubSubTestCase.TCF_NAME); admin.deleteTopic(PubSubTestCase.TOPIC_NAME); } catch (Exception ignored) { + ignored.printStackTrace(); } finally { publisherTopic = null; publisher = null; From c6028c779f25bd9c18b36924e99d640796364f0d Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 27 Feb 2018 13:06:35 -0500 Subject: [PATCH 3/4] NO-JIRA Adding known threads to stick around --- .../org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java index df86d741e7..1778c7b89d 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ThreadLeakCheckRule.java @@ -230,6 +230,8 @@ public class ThreadLeakCheckRule extends TestWatcher { if (threadName.contains("SunPKCS11")) { return true; + } else if (threadName.contains("Keep-Alive-Timer")) { + return true; } else if (threadName.contains("Attach Listener")) { return true; } else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) { From 158eb9d3bfb8ed16f40106dc37c630e9a3b3052e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 27 Feb 2018 15:50:52 -0500 Subject: [PATCH 4/4] NO-JIRA Using force option on deleteTopics and deleteQueues on JoramTests --- .../management/ActiveMQServerControl.java | 78 +++++++++---------- .../impl/ActiveMQServerControlImpl.java | 8 +- .../artemis/core/postoffice/PostOffice.java | 2 + .../core/postoffice/impl/PostOfficeImpl.java | 19 ++++- .../artemis/core/server/ActiveMQServer.java | 11 ++- .../core/server/impl/ActiveMQServerImpl.java | 8 +- .../ActiveMQServerControlUsingCoreTest.java | 5 ++ .../artemis/common/AbstractAdmin.java | 4 +- .../server/impl/fakes/FakePostOffice.java | 5 ++ 9 files changed, 93 insertions(+), 47 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index fc9de24c2d..cfde85ea4d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -438,7 +438,6 @@ public interface ActiveMQServerControl { @Attribute(desc = "global maximum limit for in-memory messages, in bytes") long getGlobalMaxSize(); - /** * Returns the memory used by all the addresses on broker for in-memory messages */ @@ -457,15 +456,19 @@ public interface ActiveMQServerControl { @Operation(desc = "create an address", impact = MBeanOperationInfo.ACTION) String createAddress(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception; + @Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception; @Operation(desc = "update an address", impact = MBeanOperationInfo.ACTION) String updateAddress(@Parameter(name = "name", desc = "The name of the address") String name, - @Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception; + @Parameter(name = "routingTypes", desc = "Comma separated list of Routing Types (anycast/multicast)") String routingTypes) throws Exception; @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception; + @Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION) + void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name, + @Parameter(name = "force", desc = "Force consumers and queues out") boolean force) throws Exception; + /** * Create a durable queue. *
@@ -481,7 +484,6 @@ public interface ActiveMQServerControl { void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, @Parameter(name = "name", desc = "Name of the queue") String name) throws Exception; - /** * Create a durable queue. *
@@ -489,8 +491,8 @@ public interface ActiveMQServerControl { *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * - * @param address address to bind the queue to - * @param name name of the queue + * @param address address to bind the queue to + * @param name name of the queue * @param routingType The routing type used for this address, MULTICAST or ANYCAST */ @Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION) @@ -498,7 +500,6 @@ public interface ActiveMQServerControl { @Parameter(name = "name", desc = "Name of the queue") String name, @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; - /** * Create a queue. *
@@ -523,9 +524,9 @@ public interface ActiveMQServerControl { *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * - * @param address address to bind the queue to - * @param name name of the queue - * @param durable whether the queue is durable + * @param address address to bind the queue to + * @param name name of the queue + * @param durable whether the queue is durable * @param routingType The routing type used for this address, MULTICAST or ANYCAST */ @Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION) @@ -559,10 +560,10 @@ public interface ActiveMQServerControl { *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * - * @param address address to bind the queue to - * @param name name of the queue - * @param filter of the queue - * @param durable whether the queue is durable + * @param address address to bind the queue to + * @param name name of the queue + * @param filter of the queue + * @param durable whether the queue is durable * @param routingType The routing type used for this address, MULTICAST or ANYCAST */ @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION) @@ -572,7 +573,6 @@ public interface ActiveMQServerControl { @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception; - /** * Create a queue. *
@@ -580,33 +580,33 @@ public interface ActiveMQServerControl { *
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * - * @param address address to bind the queue to - * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} - * @param name name of the queue - * @param filterStr filter of the queue - * @param durable is the queue durable? - * @param maxConsumers the maximum number of consumers allowed on this queue at any one time + * @param address address to bind the queue to + * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} + * @param name name of the queue + * @param filterStr filter of the queue + * @param durable is the queue durable? + * @param maxConsumers the maximum number of consumers allowed on this queue at any one time * @param purgeOnNoConsumers delete this queue when the last consumer disconnects - * @param autoCreateAddress create an address with default values should a matching address not be found + * @param autoCreateAddress create an address with default values should a matching address not be found * @return a textual summary of the queue * @throws Exception */ @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION) String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, - @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, - @Parameter(name = "name", desc = "Name of the queue") String name, - @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, - @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, - @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, - @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers, - @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, + @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers, + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; /** * Update a queue. * - * @param name name of the queue - * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} - * @param maxConsumers the maximum number of consumers allowed on this queue at any one time + * @param name name of the queue + * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} + * @param maxConsumers the maximum number of consumers allowed on this queue at any one time * @param purgeOnNoConsumers delete this queue when the last consumer disconnects * @return a textual summary of the queue * @throws Exception @@ -616,13 +616,12 @@ public interface ActiveMQServerControl { @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers) throws Exception; - /** * Update a queue. * - * @param name name of the queue - * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} - * @param maxConsumers the maximum number of consumers allowed on this queue at any one time + * @param name name of the queue + * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} + * @param maxConsumers the maximum number of consumers allowed on this queue at any one time * @param purgeOnNoConsumers delete this queue when the last consumer disconnects * @return a textual summary of the queue * @throws Exception @@ -633,7 +632,6 @@ public interface ActiveMQServerControl { @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers, @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive) throws Exception; - /** * Deploy a durable queue. *
@@ -689,7 +687,6 @@ public interface ActiveMQServerControl { @Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers, @Parameter(name = "autoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception; - /** * Enables message counters for this server. */ @@ -987,6 +984,7 @@ public interface ActiveMQServerControl { @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues, @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics, @Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception; + /** * adds a new address setting for a specific address */ @@ -1213,8 +1211,8 @@ public interface ActiveMQServerControl { @Operation(desc = "Search for Consumers", impact = MBeanOperationInfo.INFO) String listConsumers(@Parameter(name = "Options") String options, - @Parameter(name = "Page Number") int page, - @Parameter(name = "Page Size") int pageSize) throws Exception; + @Parameter(name = "Page Number") int page, + @Parameter(name = "Page Size") int pageSize) throws Exception; @Operation(desc = "Search for Consumers", impact = MBeanOperationInfo.INFO) String listProducers(@Parameter(name = "Options") String options, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 73a731d516..d0f0b5cc63 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -722,13 +722,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active } } + @Override public void deleteAddress(String name) throws Exception { + deleteAddress(name, false); + } + + @Override + public void deleteAddress(String name, boolean force) throws Exception { checkStarted(); clearIO(); try { - server.removeAddressInfo(new SimpleString(name), null); + server.removeAddressInfo(new SimpleString(name), null, force); } catch (ActiveMQException e) { throw new IllegalStateException(e.getMessage()); } finally { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index 5d081a3291..19ddd94f39 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -58,6 +58,8 @@ public interface PostOffice extends ActiveMQComponent { AddressInfo removeAddressInfo(SimpleString address) throws Exception; + AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception; + AddressInfo getAddressInfo(SimpleString address); AddressInfo updateAddressInfo(SimpleString addressName, EnumSet routingTypes) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 3f9356c664..b2bfe37e07 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -544,16 +544,31 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } } + @Override public AddressInfo removeAddressInfo(SimpleString address) throws Exception { + return removeAddressInfo(address, false); + } + + @Override + public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception { synchronized (addressLock) { if (server.hasBrokerPlugins()) { server.callBrokerPlugins(plugin -> plugin.beforeRemoveAddress(address)); } final Bindings bindingsForAddress = getDirectBindings(address); - if (bindingsForAddress.getBindings().size() > 0) { - throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address); + if (force) { + for (Binding binding : bindingsForAddress.getBindings()) { + if (binding instanceof QueueBinding) { + ((QueueBinding)binding).getQueue().deleteQueue(true); + } + } + + } else { + if (bindingsForAddress.getBindings().size() > 0) { + throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address); + } } managementService.unregisterAddress(address); final AddressInfo addressInfo = addressManager.removeAddressInfo(address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index b8598b8825..88c25a853b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -511,7 +511,6 @@ public interface ActiveMQServer extends ServiceComponent { * @throws Exception */ AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception; - /** * Remove an {@code AddressInfo} from the broker. * @@ -521,6 +520,16 @@ public interface ActiveMQServer extends ServiceComponent { */ void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception; + /** + * Remove an {@code AddressInfo} from the broker. + * + * @param address the {@code AddressInfo} to remove + * @param auth authorization information; {@code null} is valid + * @param force It will disconnect everything from the address including queues and consumers + * @throws Exception + */ + void removeAddressInfo(SimpleString address, SecurityAuth auth, boolean force) throws Exception; + String getInternalNamingPrefix(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index ad3dba80be..1f0dbd6a14 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2661,14 +2661,20 @@ public class ActiveMQServerImpl implements ActiveMQServer { return getAddressInfo(addressInfo.getName()); } + @Override public void removeAddressInfo(final SimpleString address, final SecurityAuth auth) throws Exception { + removeAddressInfo(address, auth, false); + } + + @Override + public void removeAddressInfo(final SimpleString address, final SecurityAuth auth, boolean force) throws Exception { if (auth != null) { securityStore.check(address, CheckType.DELETE_ADDRESS, auth); } AddressInfo addressInfo = getAddressInfo(address); - if (postOffice.removeAddressInfo(address) == null) { + if (postOffice.removeAddressInfo(address, force) == null) { throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 16d64fd7cf..57ead701ac 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -150,6 +150,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes proxy.invokeOperation("deleteAddress", name); } + @Override + public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name, @Parameter(name = "force", desc = "Force everything out!") boolean force) throws Exception { + proxy.invokeOperation("deleteAddress", name, force); + } + @Override public void createQueue(final String address, final String name, diff --git a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java index 38fc739c75..70ce6be9b2 100644 --- a/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java +++ b/tests/joram-tests/src/test/java/org/apache/activemq/artemis/common/AbstractAdmin.java @@ -130,7 +130,7 @@ public class AbstractAdmin implements Admin { public void deleteQueue(final String name) { Boolean result; try { - invokeSyncOperation(ResourceNames.BROKER, "destroyQueue", name); + invokeSyncOperation(ResourceNames.BROKER, "destroyQueue", name, true); } catch (Exception e) { throw new IllegalStateException(e); } @@ -159,7 +159,7 @@ public class AbstractAdmin implements Admin { public void deleteTopic(final String name) { Boolean result; try { - invokeSyncOperation(ResourceNames.BROKER, "deleteAddress", name); + invokeSyncOperation(ResourceNames.BROKER, "deleteAddress", name, Boolean.TRUE); } catch (Exception e) { throw new IllegalStateException(e); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index e455e41635..6402bff2cf 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -95,6 +95,11 @@ public class FakePostOffice implements PostOffice { } + @Override + public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception { + return null; + } + @Override public boolean addAddressInfo(AddressInfo addressInfo) { return false;