ARTEMIS-1710 Allow management msgs to exceed global-max-size limit
This commit is contained in:
parent
27c2375ceb
commit
270b383e80
|
@ -241,11 +241,11 @@ public class DBOption extends OptionalLocking {
|
|||
storageManager, 1000L,
|
||||
scheduledExecutorService, executorFactory,
|
||||
false, null);
|
||||
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
|
||||
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
|
||||
} else {
|
||||
storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory);
|
||||
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null);
|
||||
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository);
|
||||
pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -486,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
try {
|
||||
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString());
|
||||
if (store.isRejectingMessages()) {
|
||||
if (store != null && store.isRejectingMessages()) {
|
||||
// We drop pre-settled messages (and abort any associated Tx)
|
||||
if (delivery.remotelySettled()) {
|
||||
if (transaction != null) {
|
||||
|
@ -585,7 +585,11 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
pagingManager.checkMemory(runnable);
|
||||
} else {
|
||||
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
|
||||
store.checkMemory(runnable);
|
||||
if (store != null) {
|
||||
store.checkMemory(runnable);
|
||||
} else {
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
|
|
|
@ -418,9 +418,13 @@ public class AMQSession implements SessionCallback {
|
|||
//non-persistent messages goes here, by default we stop reading from
|
||||
//transport
|
||||
connection.getTransportConnection().setAutoRead(false);
|
||||
if (!store.checkMemory(enableAutoReadAndTtl)) {
|
||||
enableAutoReadAndTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
if (store != null) {
|
||||
if (!store.checkMemory(enableAutoReadAndTtl)) {
|
||||
enableAutoReadAndTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
} else {
|
||||
enableAutoReadAndTtl.run();
|
||||
}
|
||||
|
||||
getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
|
@ -443,7 +447,7 @@ public class AMQSession implements SessionCallback {
|
|||
final AtomicInteger count,
|
||||
final org.apache.activemq.artemis.api.core.Message coreMsg,
|
||||
final SimpleString address) throws ResourceAllocationException {
|
||||
if (!store.checkMemory(false, () -> {
|
||||
final Runnable task = () -> {
|
||||
Exception exceptionToSend = null;
|
||||
|
||||
try {
|
||||
|
@ -496,10 +500,15 @@ public class AMQSession implements SessionCallback {
|
|||
});
|
||||
}
|
||||
}
|
||||
})) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.enableTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
};
|
||||
if (store != null) {
|
||||
if (!store.checkMemory(false, task)) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.enableTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
} else {
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -201,17 +201,29 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
|||
public long getNumberOfBytesPerPage() throws Exception {
|
||||
clearIO();
|
||||
try {
|
||||
return pagingManager.getPageStore(addressInfo.getName()).getPageSizeBytes();
|
||||
final PagingStore pagingStore = getPagingStore();
|
||||
if (pagingStore == null) {
|
||||
return 0;
|
||||
}
|
||||
return pagingStore.getPageSizeBytes();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
private PagingStore getPagingStore() throws Exception {
|
||||
return pagingManager.getPageStore(addressInfo.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAddressSize() throws Exception {
|
||||
clearIO();
|
||||
try {
|
||||
return pagingManager.getPageStore(addressInfo.getName()).getAddressSize();
|
||||
final PagingStore pagingStore = getPagingStore();
|
||||
if (pagingStore == null) {
|
||||
return 0;
|
||||
}
|
||||
return pagingStore.getAddressSize();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
@ -240,7 +252,11 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
|||
public boolean isPaging() throws Exception {
|
||||
clearIO();
|
||||
try {
|
||||
return pagingManager.getPageStore(addressInfo.getName()).isPaging();
|
||||
final PagingStore pagingStore = getPagingStore();
|
||||
if (pagingStore == null) {
|
||||
return false;
|
||||
}
|
||||
return pagingStore.isPaging();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
@ -250,12 +266,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
|
|||
public int getNumberOfPages() throws Exception {
|
||||
clearIO();
|
||||
try {
|
||||
PagingStore pageStore = pagingManager.getPageStore(addressInfo.getName());
|
||||
final PagingStore pageStore = getPagingStore();
|
||||
|
||||
if (!pageStore.isPaging()) {
|
||||
if (pageStore == null || !pageStore.isPaging()) {
|
||||
return 0;
|
||||
} else {
|
||||
return pagingManager.getPageStore(addressInfo.getName()).getNumberOfPages();
|
||||
return pageStore.getNumberOfPages();
|
||||
}
|
||||
} finally {
|
||||
blockOnIO();
|
||||
|
|
|
@ -86,6 +86,8 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
|
||||
private ActiveMQScheduledComponent scheduledComponent = null;
|
||||
|
||||
private final SimpleString managementAddress;
|
||||
|
||||
// Static
|
||||
// --------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
|
@ -105,17 +107,25 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
|
||||
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final long maxSize) {
|
||||
final long maxSize,
|
||||
final SimpleString managementAddress) {
|
||||
pagingStoreFactory = pagingSPI;
|
||||
this.addressSettingsRepository = addressSettingsRepository;
|
||||
addressSettingsRepository.registerListener(this);
|
||||
this.maxSize = maxSize;
|
||||
this.memoryExecutor = pagingSPI.newExecutor();
|
||||
this.managementAddress = managementAddress;
|
||||
}
|
||||
|
||||
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
|
||||
this(pagingSPI, addressSettingsRepository, -1);
|
||||
this(pagingSPI, addressSettingsRepository, -1, null);
|
||||
}
|
||||
|
||||
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
|
||||
final SimpleString managementAddress) {
|
||||
this(pagingSPI, addressSettingsRepository, -1, managementAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -329,6 +339,9 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
*/
|
||||
@Override
|
||||
public PagingStore getPageStore(final SimpleString storeName) throws Exception {
|
||||
if (managementAddress != null && storeName.startsWith(managementAddress)) {
|
||||
return null;
|
||||
}
|
||||
PagingStore store = stores.get(storeName);
|
||||
|
||||
if (store != null) {
|
||||
|
@ -438,6 +451,7 @@ public final class PagingManagerImpl implements PagingManager {
|
|||
}
|
||||
|
||||
private PagingStore newStore(final SimpleString address) throws Exception {
|
||||
assert managementAddress == null || (managementAddress != null && !address.startsWith(managementAddress));
|
||||
syncLock.readLock().lock();
|
||||
try {
|
||||
PagingStore store = stores.get(address);
|
||||
|
|
|
@ -1244,6 +1244,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
if (queueInfo != null) {
|
||||
SimpleString address = queueInfo.getAddress();
|
||||
PagingStore store = pagingManager.getPageStore(address);
|
||||
if (store == null) {
|
||||
return null;
|
||||
}
|
||||
subs = store.getCursorProvider().getSubscription(queueID);
|
||||
pageSubscriptions.put(queueID, subs);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
|
|||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
@ -62,13 +61,10 @@ public final class BindingsImpl implements Bindings {
|
|||
|
||||
private final GroupingHandler groupingHandler;
|
||||
|
||||
private final PagingStore pageStore;
|
||||
|
||||
private final SimpleString name;
|
||||
|
||||
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, final PagingStore pageStore) {
|
||||
public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) {
|
||||
this.groupingHandler = groupingHandler;
|
||||
this.pageStore = pageStore;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
|
|
|
@ -1252,7 +1252,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {
|
||||
PagingStore store = pagingManager.getPageStore(entry.getKey());
|
||||
|
||||
if (storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
|
||||
if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {
|
||||
if (message.isLargeMessage()) {
|
||||
confirmLargeMessageSend(tx, message);
|
||||
}
|
||||
|
@ -1696,9 +1696,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
|
|||
}
|
||||
|
||||
@Override
|
||||
public Bindings createBindings(final SimpleString address) throws Exception {
|
||||
public Bindings createBindings(final SimpleString address) {
|
||||
GroupingHandler groupingHandler = server.getGroupingHandler();
|
||||
BindingsImpl bindings = new BindingsImpl(address, groupingHandler, pagingManager.getPageStore(address));
|
||||
BindingsImpl bindings = new BindingsImpl(address, groupingHandler);
|
||||
if (groupingHandler != null) {
|
||||
groupingHandler.addListener(bindings);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.filter.FilterUtils;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
|
||||
|
||||
public final class QueueConfig {
|
||||
|
@ -202,7 +203,12 @@ public final class QueueConfig {
|
|||
final PageSubscription pageSubscription;
|
||||
if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) {
|
||||
try {
|
||||
pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable);
|
||||
final PagingStore pageStore = this.pagingManager.getPageStore(address);
|
||||
if (pageStore != null) {
|
||||
pageSubscription = pageStore.getCursorProvider().createSubscription(id, filter, durable);
|
||||
} else {
|
||||
pageSubscription = null;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
|
|
@ -2384,7 +2384,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
@Override
|
||||
public PagingManager createPagingManager() throws Exception {
|
||||
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
|
||||
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress());
|
||||
}
|
||||
|
||||
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
|
||||
|
|
|
@ -363,7 +363,7 @@ public class PostOfficeJournalLoader implements JournalLoader {
|
|||
// This can't be true!
|
||||
assert (perQueue != null);
|
||||
|
||||
if (store.checkPageFileExists(pageId.intValue())) {
|
||||
if (store != null && store.checkPageFileExists(pageId.intValue())) {
|
||||
// on this case we need to recalculate the records
|
||||
Page pg = store.createPage(pageId.intValue());
|
||||
pg.open();
|
||||
|
|
|
@ -154,7 +154,9 @@ public class ScaleDownHandler {
|
|||
|
||||
Transaction tx = new TransactionImpl(storageManager);
|
||||
|
||||
pageStore.disableCleanup();
|
||||
if (pageStore != null) {
|
||||
pageStore.disableCleanup();
|
||||
}
|
||||
|
||||
try {
|
||||
|
||||
|
@ -240,8 +242,10 @@ public class ScaleDownHandler {
|
|||
|
||||
return messageCount;
|
||||
} finally {
|
||||
pageStore.enableCleanup();
|
||||
pageStore.getCursorProvider().scheduleCleanup();
|
||||
if (pageStore != null) {
|
||||
pageStore.enableCleanup();
|
||||
pageStore.getCursorProvider().scheduleCleanup();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,6 +560,9 @@ public class ScaleDownHandler {
|
|||
public boolean lookup(MessageReference reference) throws Exception {
|
||||
|
||||
if (reference.isPaged()) {
|
||||
if (store == null) {
|
||||
return false;
|
||||
}
|
||||
PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID());
|
||||
if (subscription.contains((PagedReference) reference)) {
|
||||
return true;
|
||||
|
|
|
@ -1536,7 +1536,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
final SimpleString addr = removePrefix(address);
|
||||
PagingStore store = server.getPagingManager().getPageStore(addr);
|
||||
|
||||
if (!store.checkMemory(new Runnable() {
|
||||
if (store == null) {
|
||||
callback.sendProducerCreditsMessage(credits, address);
|
||||
} else if (!store.checkMemory(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callback.sendProducerCreditsMessage(credits, address);
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.io.PrintWriter;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl;
|
||||
|
@ -304,7 +305,7 @@ public class FileMoveManagerTest {
|
|||
|
||||
PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null);
|
||||
|
||||
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1);
|
||||
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress());
|
||||
|
||||
managerImpl.start();
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.policy;
|
||||
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
|
@ -153,7 +154,11 @@ public class DestinationProxy implements Destination {
|
|||
@Override
|
||||
public long getUsage() {
|
||||
try {
|
||||
return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize();
|
||||
final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress());
|
||||
if (pageStore == null) {
|
||||
return 0;
|
||||
}
|
||||
return pageStore.getAddressSize();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -221,9 +226,13 @@ public class DestinationProxy implements Destination {
|
|||
|
||||
@Override
|
||||
public int getPercentUsage() {
|
||||
long total = 0;
|
||||
final long total;
|
||||
try {
|
||||
total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize();
|
||||
final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress());
|
||||
if (pageStore == null) {
|
||||
return 0;
|
||||
}
|
||||
total = pageStore.getMaxSize();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -19,15 +19,23 @@ package org.apache.activemq.artemis.tests.integration.paging;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
|
@ -192,4 +200,91 @@ public class GlobalPagingTest extends PagingTest {
|
|||
session.commit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testManagementAddressCannotPageOrChangeGlobalSize() throws Exception {
|
||||
clearDataRecreateServerDirs();
|
||||
|
||||
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
|
||||
|
||||
final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1);
|
||||
|
||||
try {
|
||||
final SimpleString managementAddress = server.getConfiguration().getManagementAddress();
|
||||
server.getConfiguration().setGlobalMaxSize(1);
|
||||
server.start();
|
||||
|
||||
final ServerLocator locator = createInVMNonHALocator()
|
||||
.setBlockOnNonDurableSend(true)
|
||||
.setBlockOnDurableSend(true)
|
||||
.setBlockOnAcknowledge(true);
|
||||
|
||||
try (ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true)) {
|
||||
|
||||
session.start();
|
||||
|
||||
if (server.locateQueue(managementAddress) == null) {
|
||||
|
||||
session.createQueue(managementAddress, managementAddress, null, true);
|
||||
}
|
||||
|
||||
final Queue managementQueue = server.locateQueue(managementAddress);
|
||||
|
||||
Assert.assertNull(managementQueue.getPageSubscription());
|
||||
|
||||
Assert.assertNull(server.getPagingManager().getPageStore(managementAddress));
|
||||
|
||||
final SimpleString address = SimpleString.toSimpleString("queue");
|
||||
|
||||
if (server.locateQueue(address) == null) {
|
||||
|
||||
session.createQueue(address, address, null, true);
|
||||
}
|
||||
|
||||
final CountDownLatch startSendMessages = new CountDownLatch(1);
|
||||
|
||||
final PagingManager pagingManager = server.getPagingManager();
|
||||
|
||||
final long globalSize = pagingManager.getGlobalSize();
|
||||
|
||||
final Thread globalSizeChecker = new Thread(() -> {
|
||||
startSendMessages.countDown();
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
Assert.assertEquals(globalSize, pagingManager.getGlobalSize());
|
||||
}
|
||||
});
|
||||
|
||||
globalSizeChecker.start();
|
||||
|
||||
try (ClientRequestor requestor = new ClientRequestor(session, managementAddress)) {
|
||||
|
||||
ClientMessage message = session.createMessage(false);
|
||||
|
||||
ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount");
|
||||
|
||||
Assert.assertTrue("bodySize = " + message.getBodySize() + " must be > of globalMaxSize = " + server.getConfiguration().getGlobalMaxSize(), message.getBodySize() > server.getConfiguration().getGlobalMaxSize());
|
||||
|
||||
startSendMessages.await();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
try {
|
||||
ClientMessage reply = requestor.request(message);
|
||||
Assert.assertEquals(0L, ManagementHelper.getResult(reply));
|
||||
} catch (ActiveMQAddressFullException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
globalSizeChecker.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
server.stop(true);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -598,7 +598,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
|||
final ExecutorFactory executorFactory,
|
||||
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception {
|
||||
|
||||
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository);
|
||||
PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress());
|
||||
|
||||
paging.start();
|
||||
return paging;
|
||||
|
|
|
@ -70,7 +70,7 @@ public class BindingsImplTest extends ActiveMQTestBase {
|
|||
private void internalTest(final boolean route) throws Exception {
|
||||
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
|
||||
|
||||
final Bindings bind = new BindingsImpl(null, null, null);
|
||||
final Bindings bind = new BindingsImpl(null, null);
|
||||
bind.addBinding(fake);
|
||||
bind.addBinding(new FakeBinding(new SimpleString("a")));
|
||||
bind.addBinding(new FakeBinding(new SimpleString("a")));
|
||||
|
|
Loading…
Reference in New Issue