This closes #1904
This commit is contained in:
commit
d80e469a67
|
@ -230,6 +230,8 @@ public class ThreadLeakCheckRule extends TestWatcher {
|
||||||
|
|
||||||
if (threadName.contains("SunPKCS11")) {
|
if (threadName.contains("SunPKCS11")) {
|
||||||
return true;
|
return true;
|
||||||
|
} else if (threadName.contains("Keep-Alive-Timer")) {
|
||||||
|
return true;
|
||||||
} else if (threadName.contains("Attach Listener")) {
|
} else if (threadName.contains("Attach Listener")) {
|
||||||
return true;
|
return true;
|
||||||
} else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
|
} else if ((javaVendor.contains("IBM") || isSystemThread) && threadName.equals("process reaper")) {
|
||||||
|
|
|
@ -438,7 +438,6 @@ public interface ActiveMQServerControl {
|
||||||
@Attribute(desc = "global maximum limit for in-memory messages, in bytes")
|
@Attribute(desc = "global maximum limit for in-memory messages, in bytes")
|
||||||
long getGlobalMaxSize();
|
long getGlobalMaxSize();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the memory used by all the addresses on broker for in-memory messages
|
* Returns the memory used by all the addresses on broker for in-memory messages
|
||||||
*/
|
*/
|
||||||
|
@ -466,6 +465,10 @@ public interface ActiveMQServerControl {
|
||||||
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
|
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
|
||||||
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
|
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception;
|
||||||
|
|
||||||
|
@Operation(desc = "delete an address", impact = MBeanOperationInfo.ACTION)
|
||||||
|
void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name,
|
||||||
|
@Parameter(name = "force", desc = "Force consumers and queues out") boolean force) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a durable queue.
|
* Create a durable queue.
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -481,7 +484,6 @@ public interface ActiveMQServerControl {
|
||||||
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
|
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
|
||||||
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
|
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a durable queue.
|
* Create a durable queue.
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -498,7 +500,6 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(name = "name", desc = "Name of the queue") String name,
|
@Parameter(name = "name", desc = "Name of the queue") String name,
|
||||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
|
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a queue.
|
* Create a queue.
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -572,7 +573,6 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
|
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
|
||||||
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
|
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a queue.
|
* Create a queue.
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -616,7 +616,6 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
|
||||||
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers) throws Exception;
|
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update a queue.
|
* Update a queue.
|
||||||
*
|
*
|
||||||
|
@ -633,7 +632,6 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
|
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
|
||||||
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive) throws Exception;
|
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deploy a durable queue.
|
* Deploy a durable queue.
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -689,7 +687,6 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
|
@Parameter(name = "removeConsumers", desc = "Remove consumers of this queue") boolean removeConsumers,
|
||||||
@Parameter(name = "autoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception;
|
@Parameter(name = "autoDeleteAddress", desc = "Automatically delete the address if this was the last queue") boolean autoDeleteAddress) throws Exception;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enables message counters for this server.
|
* Enables message counters for this server.
|
||||||
*/
|
*/
|
||||||
|
@ -987,6 +984,7 @@ public interface ActiveMQServerControl {
|
||||||
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
|
@Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues,
|
||||||
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
|
@Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics,
|
||||||
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
|
@Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* adds a new address setting for a specific address
|
* adds a new address setting for a specific address
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -722,13 +722,19 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteAddress(String name) throws Exception {
|
public void deleteAddress(String name) throws Exception {
|
||||||
|
deleteAddress(name, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteAddress(String name, boolean force) throws Exception {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
|
|
||||||
clearIO();
|
clearIO();
|
||||||
try {
|
try {
|
||||||
server.removeAddressInfo(new SimpleString(name), null);
|
server.removeAddressInfo(new SimpleString(name), null, force);
|
||||||
} catch (ActiveMQException e) {
|
} catch (ActiveMQException e) {
|
||||||
throw new IllegalStateException(e.getMessage());
|
throw new IllegalStateException(e.getMessage());
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -533,7 +533,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
|
||||||
cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
|
cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!storageManager.waitOnOperations(5000)) {
|
// we just need to make sure the storage is done..
|
||||||
|
// if the thread pool is full, we will just log it once instead of looping
|
||||||
|
if (!storageManager.waitOnOperations(5000)) {
|
||||||
ActiveMQServerLogger.LOGGER.problemCompletingOperations(storageManager.getContext());
|
ActiveMQServerLogger.LOGGER.problemCompletingOperations(storageManager.getContext());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -155,7 +155,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
protected BatchingIDGenerator idGenerator;
|
protected BatchingIDGenerator idGenerator;
|
||||||
|
|
||||||
protected final ExecutorFactory ioExecutors;
|
protected final ExecutorFactory ioExecutorFactory;
|
||||||
|
|
||||||
protected final ScheduledExecutorService scheduledExecutorService;
|
protected final ScheduledExecutorService scheduledExecutorService;
|
||||||
|
|
||||||
|
@ -197,15 +197,15 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
final CriticalAnalyzer analyzer,
|
final CriticalAnalyzer analyzer,
|
||||||
final ExecutorFactory executorFactory,
|
final ExecutorFactory executorFactory,
|
||||||
final ScheduledExecutorService scheduledExecutorService,
|
final ScheduledExecutorService scheduledExecutorService,
|
||||||
final ExecutorFactory ioExecutors) {
|
final ExecutorFactory ioExecutorFactory) {
|
||||||
this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutors, null);
|
this(config, analyzer, executorFactory, scheduledExecutorService, ioExecutorFactory, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public AbstractJournalStorageManager(Configuration config,
|
public AbstractJournalStorageManager(Configuration config,
|
||||||
CriticalAnalyzer analyzer,
|
CriticalAnalyzer analyzer,
|
||||||
ExecutorFactory executorFactory,
|
ExecutorFactory executorFactory,
|
||||||
ScheduledExecutorService scheduledExecutorService,
|
ScheduledExecutorService scheduledExecutorService,
|
||||||
ExecutorFactory ioExecutors,
|
ExecutorFactory ioExecutorFactory,
|
||||||
IOCriticalErrorListener criticalErrorListener) {
|
IOCriticalErrorListener criticalErrorListener) {
|
||||||
super(analyzer, CRITICAL_PATHS);
|
super(analyzer, CRITICAL_PATHS);
|
||||||
|
|
||||||
|
@ -213,7 +213,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
this.ioCriticalErrorListener = criticalErrorListener;
|
this.ioCriticalErrorListener = criticalErrorListener;
|
||||||
|
|
||||||
this.ioExecutors = ioExecutors;
|
this.ioExecutorFactory = ioExecutorFactory;
|
||||||
|
|
||||||
this.scheduledExecutorService = scheduledExecutorService;
|
this.scheduledExecutorService = scheduledExecutorService;
|
||||||
|
|
||||||
|
@ -1519,7 +1519,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
beforeStart();
|
beforeStart();
|
||||||
|
|
||||||
singleThreadExecutor = executorFactory.getExecutor();
|
singleThreadExecutor = ioExecutorFactory.getExecutor();
|
||||||
|
|
||||||
bindingsJournal.start();
|
bindingsJournal.start();
|
||||||
|
|
||||||
|
|
|
@ -129,7 +129,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
||||||
bindingsFF.setDatasync(config.isJournalDatasync());
|
bindingsFF.setDatasync(config.isJournalDatasync());
|
||||||
|
|
||||||
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
|
Journal localBindings = new JournalImpl(ioExecutorFactory, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), bindingsFF, "activemq-bindings", "bindings", 1, 0, criticalErrorListener);
|
||||||
|
|
||||||
bindingsJournal = localBindings;
|
bindingsJournal = localBindings;
|
||||||
originalBindingsJournal = localBindings;
|
originalBindingsJournal = localBindings;
|
||||||
|
@ -184,7 +184,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
protected Journal createMessageJournal(Configuration config,
|
protected Journal createMessageJournal(Configuration config,
|
||||||
IOCriticalErrorListener criticalErrorListener,
|
IOCriticalErrorListener criticalErrorListener,
|
||||||
int fileSize) {
|
int fileSize) {
|
||||||
return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
|
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Life Cycle Handlers
|
// Life Cycle Handlers
|
||||||
|
|
|
@ -58,6 +58,8 @@ public interface PostOffice extends ActiveMQComponent {
|
||||||
|
|
||||||
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
AddressInfo removeAddressInfo(SimpleString address) throws Exception;
|
||||||
|
|
||||||
|
AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception;
|
||||||
|
|
||||||
AddressInfo getAddressInfo(SimpleString address);
|
AddressInfo getAddressInfo(SimpleString address);
|
||||||
|
|
||||||
AddressInfo updateAddressInfo(SimpleString addressName, EnumSet<RoutingType> routingTypes) throws Exception;
|
AddressInfo updateAddressInfo(SimpleString addressName, EnumSet<RoutingType> routingTypes) throws Exception;
|
||||||
|
|
|
@ -544,17 +544,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
|
public AddressInfo removeAddressInfo(SimpleString address) throws Exception {
|
||||||
|
return removeAddressInfo(address, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception {
|
||||||
synchronized (addressLock) {
|
synchronized (addressLock) {
|
||||||
if (server.hasBrokerPlugins()) {
|
if (server.hasBrokerPlugins()) {
|
||||||
server.callBrokerPlugins(plugin -> plugin.beforeRemoveAddress(address));
|
server.callBrokerPlugins(plugin -> plugin.beforeRemoveAddress(address));
|
||||||
}
|
}
|
||||||
|
|
||||||
final Bindings bindingsForAddress = getDirectBindings(address);
|
final Bindings bindingsForAddress = getDirectBindings(address);
|
||||||
|
if (force) {
|
||||||
|
for (Binding binding : bindingsForAddress.getBindings()) {
|
||||||
|
if (binding instanceof QueueBinding) {
|
||||||
|
((QueueBinding)binding).getQueue().deleteQueue(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
if (bindingsForAddress.getBindings().size() > 0) {
|
if (bindingsForAddress.getBindings().size() > 0) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
|
throw ActiveMQMessageBundle.BUNDLE.addressHasBindings(address);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
managementService.unregisterAddress(address);
|
managementService.unregisterAddress(address);
|
||||||
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
|
final AddressInfo addressInfo = addressManager.removeAddressInfo(address);
|
||||||
if (server.hasBrokerPlugins()) {
|
if (server.hasBrokerPlugins()) {
|
||||||
|
|
|
@ -262,7 +262,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING);
|
journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly(JournalState.SYNCING);
|
||||||
}
|
}
|
||||||
|
|
||||||
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository());
|
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), config.getJournalBufferTimeout_NIO(), server.getScheduledPool(), server.getIOExecutorFactory(), config.isJournalSyncNonTransactional(), criticalErrorListener), server.getAddressSettingsRepository());
|
||||||
|
|
||||||
pageManager.start();
|
pageManager.start();
|
||||||
|
|
||||||
|
|
|
@ -511,7 +511,6 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
|
AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remove an {@code AddressInfo} from the broker.
|
* Remove an {@code AddressInfo} from the broker.
|
||||||
*
|
*
|
||||||
|
@ -521,6 +520,16 @@ public interface ActiveMQServer extends ServiceComponent {
|
||||||
*/
|
*/
|
||||||
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
|
void removeAddressInfo(SimpleString address, SecurityAuth auth) throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove an {@code AddressInfo} from the broker.
|
||||||
|
*
|
||||||
|
* @param address the {@code AddressInfo} to remove
|
||||||
|
* @param auth authorization information; {@code null} is valid
|
||||||
|
* @param force It will disconnect everything from the address including queues and consumers
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
void removeAddressInfo(SimpleString address, SecurityAuth auth, boolean force) throws Exception;
|
||||||
|
|
||||||
String getInternalNamingPrefix();
|
String getInternalNamingPrefix();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2661,14 +2661,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
return getAddressInfo(addressInfo.getName());
|
return getAddressInfo(addressInfo.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth) throws Exception {
|
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth) throws Exception {
|
||||||
|
removeAddressInfo(address, auth, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeAddressInfo(final SimpleString address, final SecurityAuth auth, boolean force) throws Exception {
|
||||||
if (auth != null) {
|
if (auth != null) {
|
||||||
securityStore.check(address, CheckType.DELETE_ADDRESS, auth);
|
securityStore.check(address, CheckType.DELETE_ADDRESS, auth);
|
||||||
}
|
}
|
||||||
|
|
||||||
AddressInfo addressInfo = getAddressInfo(address);
|
AddressInfo addressInfo = getAddressInfo(address);
|
||||||
if (postOffice.removeAddressInfo(address) == null) {
|
if (postOffice.removeAddressInfo(address, force) == null) {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
|
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
|
||||||
protected Journal createMessageJournal(Configuration config,
|
protected Journal createMessageJournal(Configuration config,
|
||||||
IOCriticalErrorListener criticalErrorListener,
|
IOCriticalErrorListener criticalErrorListener,
|
||||||
int fileSize) {
|
int fileSize) {
|
||||||
return new JournalImpl(ioExecutors, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
|
return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener) {
|
||||||
@Override
|
@Override
|
||||||
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
|
protected void moveNextFile(boolean scheduleReclaim) throws Exception {
|
||||||
super.moveNextFile(scheduleReclaim);
|
super.moveNextFile(scheduleReclaim);
|
||||||
|
|
|
@ -150,6 +150,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
||||||
proxy.invokeOperation("deleteAddress", name);
|
proxy.invokeOperation("deleteAddress", name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name, @Parameter(name = "force", desc = "Force everything out!") boolean force) throws Exception {
|
||||||
|
proxy.invokeOperation("deleteAddress", name, force);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createQueue(final String address,
|
public void createQueue(final String address,
|
||||||
final String name,
|
final String name,
|
||||||
|
|
|
@ -130,7 +130,7 @@ public class AbstractAdmin implements Admin {
|
||||||
public void deleteQueue(final String name) {
|
public void deleteQueue(final String name) {
|
||||||
Boolean result;
|
Boolean result;
|
||||||
try {
|
try {
|
||||||
invokeSyncOperation(ResourceNames.BROKER, "destroyQueue", name);
|
invokeSyncOperation(ResourceNames.BROKER, "destroyQueue", name, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
|
@ -159,7 +159,7 @@ public class AbstractAdmin implements Admin {
|
||||||
public void deleteTopic(final String name) {
|
public void deleteTopic(final String name) {
|
||||||
Boolean result;
|
Boolean result;
|
||||||
try {
|
try {
|
||||||
invokeSyncOperation(ResourceNames.BROKER, "deleteAddress", name);
|
invokeSyncOperation(ResourceNames.BROKER, "deleteAddress", name, Boolean.TRUE);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,6 +149,7 @@ public abstract class PubSubTestCase extends JMSTestCase {
|
||||||
admin.deleteTopicConnectionFactory(PubSubTestCase.TCF_NAME);
|
admin.deleteTopicConnectionFactory(PubSubTestCase.TCF_NAME);
|
||||||
admin.deleteTopic(PubSubTestCase.TOPIC_NAME);
|
admin.deleteTopic(PubSubTestCase.TOPIC_NAME);
|
||||||
} catch (Exception ignored) {
|
} catch (Exception ignored) {
|
||||||
|
ignored.printStackTrace();
|
||||||
} finally {
|
} finally {
|
||||||
publisherTopic = null;
|
publisherTopic = null;
|
||||||
publisher = null;
|
publisher = null;
|
||||||
|
|
|
@ -95,6 +95,11 @@ public class FakePostOffice implements PostOffice {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public AddressInfo removeAddressInfo(SimpleString address, boolean force) throws Exception {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean addAddressInfo(AddressInfo addressInfo) {
|
public boolean addAddressInfo(AddressInfo addressInfo) {
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue