ARTEMIS-581 Implement max disk usage, and global-max-size

max-disk-usage = how much of a disk we can use before the system blocks
global-max-size = how much bytes we can take from memory for messages before we start enter into the configured page mode

This will also change the default created configuration into page-mode as that's more reliable for systems.
This commit is contained in:
Clebert Suconic 2016-09-02 16:30:44 -04:00 committed by Martyn Taylor
parent bfc2095b61
commit 4472aa0e36
50 changed files with 1215 additions and 220 deletions

View File

@ -50,6 +50,17 @@ under the License.
<journal-pool-files>-1</journal-pool-files>
${journal-buffer.settings}
${connector-config.settings}
<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>
<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>
<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory -->
<global-max-size>104857600</global-max-size>
<acceptors>
<!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
<!-- performance tests have shown that openWire performs best with these buffer sizes -->
@ -78,9 +89,10 @@ ${cluster-security.settings}${cluster.settings}${replicated.settings}${shared-st
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
<address-full-policy>PAGE</address-full-policy>
</address-setting>
</address-settings>
</core>

View File

@ -429,6 +429,12 @@ public final class ActiveMQDefaultConfiguration {
// Default period to wait between configuration file checks
public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
public static final long DEFAULT_GLOBAL_MAX_SIZE = -1;
public static final int DEFAULT_MAX_DISK_USAGE = 100;
public static final int DEFAULT_DISK_SCAN = 5000;
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
@ -1144,4 +1150,17 @@ public final class ActiveMQDefaultConfiguration {
public static long getDefaultConfigurationFileRefreshPeriod() {
return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD;
}
/** The default global max size. -1 = no global max size. */
public static long getDefaultMaxGlobalSize() {
return DEFAULT_GLOBAL_MAX_SIZE;
}
public static int getDefaultMaxDiskUsage() {
return DEFAULT_MAX_DISK_USAGE;
}
public static int getDefaultDiskScanPeriod() {
return DEFAULT_DISK_SCAN;
}
}

View File

@ -219,4 +219,8 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
dataReceived = true;
}
@Override
public boolean isSupportsFlowControl() {
return true;
}
}

View File

@ -196,4 +196,12 @@ public interface RemotingConnection extends BufferHandler {
* @return
*/
boolean isSupportReconnect();
/**
* Return true if the protocol supports flow control.
* This is because in some cases we may need to hold message producers in cases like disk full.
* If the protocol doesn't support it we trash the connection and throw exceptions.
* @return
*/
boolean isSupportsFlowControl();
}

View File

@ -221,4 +221,9 @@ public class MQTTConnection implements RemotingConnection {
public boolean isSupportReconnect() {
return false;
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
}

View File

@ -738,4 +738,8 @@ public final class StompConnection implements RemotingConnection {
//unsupported
}
@Override
public boolean isSupportsFlowControl() {
return false;
}
}

View File

@ -561,10 +561,10 @@ public interface Configuration {
*/
Configuration setJournalCompactMinFiles(int minFiles);
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_FILES}.*/
int getJournalPoolFiles();
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_FILES}.*/
Configuration setJournalPoolFiles(int poolSize);
@ -968,4 +968,16 @@ public interface Configuration {
Configuration setConfigurationFileRefreshPeriod(long configurationFileRefreshPeriod);
long getGlobalMaxSize();
Configuration setGlobalMaxSize(long globalMaxSize);
int getMaxDiskUsage();
Configuration setMaxDiskUsage(int maxDiskUsage);
Configuration setDiskScanPeriod(int diskScanPeriod);
int getDiskScanPeriod();
}

View File

@ -246,6 +246,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
private long configurationFileRefreshPeriod = ActiveMQDefaultConfiguration.getDefaultConfigurationFileRefreshPeriod();
private long globalMaxSize = ActiveMQDefaultConfiguration.getDefaultMaxGlobalSize();
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod();
/**
* Parent folder for all data folders.
*/
@ -263,6 +269,28 @@ public class ConfigurationImpl implements Configuration, Serializable {
return persistenceEnabled;
}
@Override
public int getMaxDiskUsage() {
return maxDiskUsage;
}
@Override
public ConfigurationImpl setMaxDiskUsage(int maxDiskUsage) {
this.maxDiskUsage = maxDiskUsage;
return this;
}
@Override
public ConfigurationImpl setGlobalMaxSize(long maxSize) {
this.globalMaxSize = maxSize;
return this;
}
@Override
public long getGlobalMaxSize() {
return globalMaxSize;
}
@Override
public ConfigurationImpl setPersistenceEnabled(final boolean enable) {
persistenceEnabled = enable;
@ -1784,6 +1812,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public int getDiskScanPeriod() {
return diskScanPeriod;
}
@Override
public ConfigurationImpl setDiskScanPeriod(int diskScanPeriod) {
this.diskScanPeriod = diskScanPeriod;
return this;
}
/**
* It will find the right location of a subFolder, related to artemisInstance
*/

View File

@ -175,6 +175,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String MAX_QUEUES_NODE_NAME = "max-queues";
private static final String GLOBAL_MAX_SIZE = "global-max-size";
private static final String MAX_DISK_USAGE = "max-disk-usage";
private static final String DISK_SCAN_PERIOD = "disk-scan-period";
// Attributes ----------------------------------------------------
private boolean validateAIO = false;
@ -282,6 +288,12 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setConfigurationFileRefreshPeriod(getLong(e, "configuration-file-refresh-period", config.getConfigurationFileRefreshPeriod(), Validators.GT_ZERO));
config.setGlobalMaxSize(getLong(e, GLOBAL_MAX_SIZE, config.getGlobalMaxSize(), Validators.MINUS_ONE_OR_GT_ZERO));
config.setMaxDiskUsage(getInteger(e, MAX_DISK_USAGE, config.getMaxDiskUsage(), Validators.PERCENTAGE));
config.setDiskScanPeriod(getInteger(e, DISK_SCAN_PERIOD, config.getDiskScanPeriod(), Validators.MINUS_ONE_OR_GT_ZERO));
// parsing cluster password
String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK);

View File

@ -20,6 +20,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
/**
@ -78,6 +79,10 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
void resumeCleanup();
void addBlockedStore(PagingStore store);
void injectMonitor(FileStoreMonitor monitor) throws Exception;
/**
* Lock the manager. This method should not be called during normal PagingManager usage.
*/
@ -89,4 +94,15 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
* @see #lock()
*/
void unlock();
/** Add size at the global count level.
* if totalSize > globalMaxSize it will return true */
PagingManager addSize(int size);
boolean isUsingGlobalSize();
boolean isGlobalFull();
boolean isDiskFull();
}

View File

@ -130,6 +130,9 @@ public interface PagingStore extends ActiveMQComponent {
boolean isRejectingMessages();
/** It will return true if the destination is leaving blocking. */
boolean checkReleasedMemory();
/**
* Write lock the PagingStore.
*

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -43,4 +44,6 @@ public interface PagingStoreFactory {
SequentialFileFactory newFileFactory(SimpleString address) throws Exception;
void injectMonitor(FileStoreMonitor monitor) throws Exception;
}

View File

@ -67,6 +67,8 @@ final class PageSyncTimer {
ctx.pageSyncLineUp();
if (!pendingSync) {
pendingSync = true;
// this is a single event
scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
}
syncOperations.add(ctx);

View File

@ -16,11 +16,14 @@
*/
package org.apache.activemq.artemis.core.paging.impl;
import java.nio.file.FileStore;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -28,8 +31,11 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.jboss.logging.Logger;
public final class PagingManagerImpl implements PagingManager {
@ -46,14 +52,22 @@ public final class PagingManagerImpl implements PagingManager {
*/
private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
private final Set<PagingStore> blockedStored = new ConcurrentHashSet<>();
private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<>();
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
private final PagingStoreFactory pagingStoreFactory;
private final AtomicLong globalSizeBytes = new AtomicLong(0);
private final long maxSize;
private volatile boolean cleanupEnabled = true;
private volatile boolean diskFull = false;
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
// Static
@ -63,10 +77,21 @@ public final class PagingManagerImpl implements PagingManager {
// --------------------------------------------------------------------------------------------------------------------
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
final HierarchicalRepository<AddressSettings> addressSettingsRepository,
final long maxSize) {
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
addressSettingsRepository.registerListener(this);
this.maxSize = maxSize;
}
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) {
this(pagingSPI, addressSettingsRepository, -1);
}
public void addBlockedStore(PagingStore store) {
blockedStored.add(store);
}
@Override
@ -81,6 +106,72 @@ public final class PagingManagerImpl implements PagingManager {
}
}
@Override
public PagingManagerImpl addSize(int size) {
globalSizeBytes.addAndGet(size);
if (size < 0) {
checkMemoryRelease();
}
return this;
}
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
Iterator<PagingStore> storeIterator = blockedStored.iterator();
while (storeIterator.hasNext()) {
PagingStore store = storeIterator.next();
if (store.checkReleasedMemory()) {
storeIterator.remove();
}
}
}
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
pagingStoreFactory.injectMonitor(monitor);
monitor.addCallback(new LocalMonitor());
}
class LocalMonitor implements FileStoreMonitor.Callback {
@Override
public void tick(FileStore store, double usage) {
logger.tracef("Tick from store:: %s, usage at %f", store, usage);
}
@Override
public void over(FileStore store, double usage) {
if (!diskFull) {
ActiveMQServerLogger.LOGGER.diskBeyondCapacity();
diskFull = true;
}
}
@Override
public void under(FileStore store, double usage) {
if (diskFull) {
ActiveMQServerLogger.LOGGER.diskCapacityRestored();
diskFull = false;
checkMemoryRelease();
}
}
}
@Override
public boolean isDiskFull() {
return diskFull;
}
public boolean isUsingGlobalSize() {
return maxSize > 0;
}
public boolean isGlobalFull() {
return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;
}
@Override
public void disableCleanup() {
if (!cleanupEnabled) {

View File

@ -40,7 +40,8 @@ import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ExecutorFactory;
@ -95,6 +96,11 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
public void stop() {
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
monitor.addStore(this.directory);
}
@Override
public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());

View File

@ -98,6 +98,8 @@ public class PagingStoreImpl implements PagingStore {
private final PagingManager pagingManager;
private final boolean usingGlobalMaxSize;
private final Executor executor;
// Bytes consumed by the queue on the memory
@ -176,6 +178,7 @@ public class PagingStoreImpl implements PagingStore {
this.cursorProvider = storeFactory.newCursorProvider(this, this.storageManager, addressSettings, executor);
this.usingGlobalMaxSize = pagingManager.isUsingGlobalSize();
}
/**
@ -242,7 +245,13 @@ public class PagingStoreImpl implements PagingStore {
@Override
public long getMaxSize() {
return maxSize;
if (maxSize < 0) {
// if maxSize < 0, we will return 2 pages for depage purposes
return pageSize * 2;
}
else {
return maxSize;
}
}
@Override
@ -626,7 +635,7 @@ public class PagingStoreImpl implements PagingStore {
}
private final Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
private final Queue<OverSizedRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<>();
private class MemoryFreedRunnablesExecutor implements Runnable {
@ -642,13 +651,14 @@ public class PagingStoreImpl implements PagingStore {
private final Runnable memoryFreedRunnablesExecutor = new MemoryFreedRunnablesExecutor();
private static final class OurRunnable implements Runnable {
// To be used when the memory is oversized either by local settings or global settings on blocking addresses
private static final class OverSizedRunnable implements Runnable {
private boolean ran;
private final Runnable runnable;
private OurRunnable(final Runnable runnable) {
private OverSizedRunnable(final Runnable runnable) {
this.runnable = runnable;
}
@ -664,9 +674,15 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1) {
if (sizeInBytes.get() > maxSize) {
OurRunnable ourRunnable = new OurRunnable(runWhenAvailable);
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
return false;
}
}
else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) {
OverSizedRunnable ourRunnable = new OverSizedRunnable(runWhenAvailable);
onMemoryFreedRunnables.add(ourRunnable);
@ -674,23 +690,24 @@ public class PagingStoreImpl implements PagingStore {
// has been added, but the check to execute was done before the element was added
// NOTE! We do not fix this race by locking the whole thing, doing this check provides
// MUCH better performance in a highly concurrent environment
if (sizeInBytes.get() <= maxSize) {
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) {
// run it now
ourRunnable.run();
}
else if (!blocking.get()) {
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(true);
else {
if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
pagingManager.addBlockedStore(this);
}
if (!blocking.get()) {
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(true);
}
}
return true;
}
}
else if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && maxSize != -1) {
if (sizeInBytes.get() > maxSize) {
return false;
}
}
runWhenAvailable.run();
@ -699,40 +716,48 @@ public class PagingStoreImpl implements PagingStore {
@Override
public void addSize(final int size) {
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
if (maxSize != -1) {
long newSize = sizeInBytes.addAndGet(size);
if (newSize <= maxSize) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(memoryFreedRunnablesExecutor);
if (blocking.get()) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(false);
}
}
}
boolean globalFull = pagingManager.addSize(size).isGlobalFull();
long newSize = sizeInBytes.addAndGet(size);
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
checkReleaseMemory(globalFull, newSize);
}
return;
}
else if (addressFullMessagePolicy == AddressFullMessagePolicy.PAGE) {
final long addressSize = sizeInBytes.addAndGet(size);
if (size > 0) {
if (maxSize > 0 && addressSize > maxSize) {
if (maxSize != -1 && newSize > maxSize || globalFull) {
if (startPaging()) {
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, addressSize, maxSize);
ActiveMQServerLogger.LOGGER.pageStoreStart(storeName, newSize, maxSize);
}
}
}
return;
}
else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
sizeInBytes.addAndGet(size);
}
@Override
public boolean checkReleasedMemory() {
return checkReleaseMemory(pagingManager.isGlobalFull(), sizeInBytes.get());
}
public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(memoryFreedRunnablesExecutor);
if (blocking.get()) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, sizeInBytes.get(), maxSize);
blocking.set(false);
return true;
}
}
}
return false;
}
@Override
@ -1073,7 +1098,7 @@ public class PagingStoreImpl implements PagingStore {
// To be used on isDropMessagesWhenFull
@Override
public boolean isFull() {
return maxSize > 0 && getAddressSize() > maxSize;
return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
}
@Override

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -413,4 +414,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
* {@link org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl}
*/
void persistIdGenerator();
void injectMonitor(FileStoreMonitor monitor) throws Exception;
}

View File

@ -60,6 +60,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.jboss.logging.Logger;
@ -68,6 +69,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
private SequentialFileFactory journalFF;
private SequentialFileFactory bindingsFF;
SequentialFileFactory largeMessagesFactory;
private Journal originalMessageJournal;
@ -95,7 +98,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
throw ActiveMQMessageBundle.BUNDLE.invalidJournal();
}
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
@ -726,4 +729,11 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
readUnLock();
}
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
monitor.addStore(journalFF.getDirectory());
monitor.addStore(largeMessagesFactory.getDirectory());
monitor.addStore(bindingsFF.getDirectory());
}
}

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -83,6 +84,11 @@ public class NullStorageManager implements StorageManager {
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
private static final OperationContext dummyContext = new OperationContext() {
@Override

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQIncompatibleClientServerException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
@ -370,4 +371,8 @@ public interface ActiveMQMessageBundle {
@Message(id = 119118, value = "Management method not applicable for current server configuration")
IllegalStateException methodNotApplicable();
@Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.")
ActiveMQIOErrorException diskBeyondLimit();
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
/** This is for components with a scheduled at a fixed rate. */
public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, Runnable {
private static final Logger logger = Logger.getLogger(ActiveMQScheduledComponent.class);
private final ScheduledExecutorService scheduledExecutorService;
private long period;
private TimeUnit timeUnit;
private ScheduledFuture future;
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
long checkPeriod,
TimeUnit timeUnit) {
this.scheduledExecutorService = scheduledExecutorService;
this.period = checkPeriod;
this.timeUnit = timeUnit;
}
@Override
public synchronized void start() {
if (future != null) {
return;
}
if (period >= 0) {
future = scheduledExecutorService.scheduleWithFixedDelay(this, period, period, timeUnit);
}
else {
logger.tracef("did not start scheduled executor on %s because period was configured as %d", this, period);
}
}
public long getPeriod() {
return period;
}
public synchronized ActiveMQScheduledComponent setPeriod(long period) {
this.period = period;
restartIfNeeded();
return this;
}
public TimeUnit getTimeUnit() {
return timeUnit;
}
public synchronized ActiveMQScheduledComponent setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
restartIfNeeded();
return this;
}
@Override
public synchronized void stop() {
if (future == null) {
return; // no big deal
}
future.cancel(false);
future = null;
}
@Override
public synchronized boolean isStarted() {
return future != null;
}
// this will restart the schedulped component upon changes
private void restartIfNeeded() {
if (isStarted()) {
stop();
start();
}
}
}

View File

@ -1242,6 +1242,16 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void impossibleToRouteGrouped();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222210, value = "Storage usage is beyond max-disk-usage. System will start blocking producers.",
format = Message.Format.MESSAGE_FORMAT)
void diskBeyondCapacity();
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222211, value = "Storage is back to stable now, under max-disk-usage.",
format = Message.Format.MESSAGE_FORMAT)
void diskCapacityRestored();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
package org.apache.activemq.artemis.core.server.files;
import java.io.File;
import java.io.FilenameFilter;

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.files;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.jboss.logging.Logger;
/** This will keep a list of fileStores. It will make a comparisson on all file stores registered. if any is over the limit,
* all Callbacks will be called with over.
*
* For instance: if Large Messages folder is registered on a different folder and it's over capacity,
* the whole system will be waiting it to be released.
* */
public class FileStoreMonitor extends ActiveMQScheduledComponent {
private static final Logger logger = Logger.getLogger(FileStoreMonitor.class);
private final Set<Callback> callbackList = new HashSet<>();
private final Set<FileStore> stores = new HashSet<>();
private double maxUsage;
public FileStoreMonitor(ScheduledExecutorService scheduledExecutorService,
long checkPeriod,
TimeUnit timeUnit,
double maxUsage) {
super(scheduledExecutorService, checkPeriod, timeUnit);
this.maxUsage = maxUsage;
}
public synchronized FileStoreMonitor addCallback(Callback callback) {
callbackList.add(callback);
return this;
}
public synchronized FileStoreMonitor addStore(File file) throws IOException {
if (file.exists()) {
addStore(Files.getFileStore(file.toPath()));
}
return this;
}
public synchronized FileStoreMonitor addStore(FileStore store) {
stores.add(store);
return this;
}
public void run() {
tick();
}
public synchronized void tick() {
boolean over = false;
FileStore lastStore = null;
double usage = 0;
for (FileStore store : stores) {
try {
lastStore = store;
usage = calculateUsage(store);
over = usage > maxUsage;
if (over) {
break;
}
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
for (Callback callback : callbackList) {
callback.tick(lastStore, usage);
if (over) {
callback.over(lastStore, usage);
}
else {
callback.under(lastStore, usage);
}
}
}
public double getMaxUsage() {
return maxUsage;
}
public FileStoreMonitor setMaxUsage(double maxUsage) {
this.maxUsage = maxUsage;
return this;
}
protected double calculateUsage(FileStore store) throws IOException {
return 1.0 - (double)store.getUsableSpace() / (double)store.getTotalSpace();
}
public interface Callback {
void tick(FileStore store, double usage);
void over(FileStore store, double usage);
void under(FileStore store, double usage);
}
}

View File

@ -123,6 +123,8 @@ import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.group.impl.LocalGroupingHandler;
@ -249,6 +251,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private ReloadManager reloadManager;
private FileStoreMonitor fileStoreMonitor;
/**
* This will be set by the JMS Queue Manager.
*/
@ -756,6 +760,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
state = SERVER_STATE.STOPPING;
if (fileStoreMonitor != null) {
fileStoreMonitor.stop();
fileStoreMonitor = null;
}
activation.sendLiveIsStopping();
stopComponent(connectorsService);
@ -1277,7 +1286,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SessionCallback callback,
OperationContext context,
boolean autoCreateJMSQueues) throws Exception {
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null);
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(),
xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(),
defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null,
pagingManager);
}
@Override
@ -1771,8 +1783,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
protected PagingManager createPagingManager() {
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository);
return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize());
}
protected PagingStoreFactoryNIO getPagingStoreFactory() {
@ -2042,6 +2053,25 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// We can only do this after everything is started otherwise we may get nasty races with expired messages
postOffice.startExpiryScanner();
}
try {
injectMonitor(new FileStoreMonitor(getScheduledPool(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f));
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
/** This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */
public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception {
this.fileStoreMonitor = storeMonitor;
pagingManager.injectMonitor(storeMonitor);
storageManager.injectMonitor(storeMonitor);
fileStoreMonitor.start();
}
public FileStoreMonitor getMonitor() {
return fileStoreMonitor;
}
public void completeActivation() throws Exception {
@ -2075,8 +2105,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
addressCount++;
}
long maxMemory = Runtime.getRuntime().maxMemory();
if (totalMaxSizeBytes >= maxMemory) {
if (totalMaxSizeBytes >= maxMemory && configuration.getGlobalMaxSize() < 0) {
ActiveMQServerLogger.LOGGER.potentialOOME(addressCount, totalMaxSizeBytes, maxMemory);
}
}

View File

@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
@ -47,6 +48,7 @@ import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -128,6 +130,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected boolean xa;
protected final PagingManager pagingManager;
protected final StorageManager storageManager;
private final ResourceManager resourceManager;
@ -199,7 +203,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final SimpleString defaultAddress,
final SessionCallback callback,
final OperationContext context,
final QueueCreator queueCreator) throws Exception {
final QueueCreator queueCreator,
final PagingManager pagingManager) throws Exception {
this.username = username;
this.password = password;
@ -224,6 +229,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
this.securityStore = securityStore;
this.pagingManager = pagingManager;
timeoutSeconds = resourceManager.getTimeoutSeconds();
this.xa = xa;
@ -1249,6 +1256,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public RoutingStatus send(final ServerMessage message, final boolean direct, boolean noAutoCreateQueue) throws Exception {
// If the protocol doesn't support flow control, we have no choice other than fail the communication
if (!this.getRemotingConnection().isSupportsFlowControl() && pagingManager.isDiskFull()) {
ActiveMQIOErrorException exception = ActiveMQMessageBundle.BUNDLE.diskBeyondLimit();
this.getRemotingConnection().fail(exception);
throw exception;
}
RoutingStatus result = RoutingStatus.OK;
//large message may come from StompSession directly, in which
//case the id header already generated.

View File

@ -24,33 +24,25 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.jboss.logging.Logger;
public class ReloadManagerImpl implements ReloadManager {
public class ReloadManagerImpl extends ActiveMQScheduledComponent implements ReloadManager {
private static final Logger logger = Logger.getLogger(ReloadManagerImpl.class);
private final ScheduledExecutorService scheduledExecutorService;
private final long checkPeriod;
private ScheduledFuture future;
private volatile Runnable tick;
private Map<URL, ReloadRegistry> registry = new HashMap<>();
public ReloadManagerImpl(ScheduledExecutorService scheduledExecutorService, long checkPeriod) {
this.scheduledExecutorService = scheduledExecutorService;
this.checkPeriod = checkPeriod;
super(scheduledExecutorService, checkPeriod, TimeUnit.MILLISECONDS);
}
@Override
public synchronized void start() {
if (future != null) {
return;
}
future = scheduledExecutorService.scheduleWithFixedDelay(new ConfigurationFileReloader(), checkPeriod, checkPeriod, TimeUnit.MILLISECONDS);
public void run() {
tick();
}
@Override
@ -58,25 +50,9 @@ public class ReloadManagerImpl implements ReloadManager {
this.tick = tick;
}
@Override
public synchronized void stop() {
if (future == null) {
return; // no big deal
}
future.cancel(false);
future = null;
}
@Override
public synchronized boolean isStarted() {
return future != null;
}
@Override
public synchronized void addCallback(URL uri, ReloadCallback callback) {
if (future == null) {
if (!isStarted()) {
start();
}
ReloadRegistry uriRegistry = getRegistry(uri);
@ -104,20 +80,6 @@ public class ReloadManagerImpl implements ReloadManager {
return uriRegistry;
}
private final class ConfigurationFileReloader implements Runnable {
@Override
public void run() {
try {
tick();
}
catch (Exception e) {
ActiveMQServerLogger.LOGGER.configurationReloadFailed(e);
}
}
}
class ReloadRegistry {
private final File file;
private final URL uri;

View File

@ -676,6 +676,30 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="global-max-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Global Max Size before all addresses will enter into their Full Policy configured upon messages being produced.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="max-disk-usage" type="xsd:int" default="90" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Max percentage of disk usage before the system blocks or fail clients.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="disk-scan-period" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how often (in ms) to scan the disks for full disks.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="memory-warning-threshold" type="xsd:int" default="25" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -356,6 +356,9 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertFalse(a2Role.isCreateNonDurableQueue());
assertTrue(a2Role.isDeleteNonDurableQueue());
assertFalse(a2Role.isManage());
assertEquals(1234567, conf.getGlobalMaxSize());
assertEquals(37, conf.getMaxDiskUsage());
assertEquals(123, conf.getDiskScanPeriod());
}
@Test

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
package org.apache.activemq.artemis.core.server.files;
import java.io.BufferedReader;
import java.io.File;
@ -286,7 +286,7 @@ public class FileMoveManagerTest {
new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null,
new OrderedExecutorFactory(threadPool), true, null);
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings);
PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1);
managerImpl.start();

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.files;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.FileStore;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class FileStoreMonitorTest extends ActiveMQTestBase {
private ScheduledExecutorService scheduledExecutorService;
@Before
public void startScheduled() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
}
@After
public void stopScheduled() {
scheduledExecutorService.shutdown();
scheduledExecutorService = null;
}
@Test
public void testSimpleTick() throws Exception {
File garbageFile = new File(getTestDirfile(), "garbage.bin");
FileOutputStream garbage = new FileOutputStream(garbageFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(garbage);
PrintStream out = new PrintStream(bufferedOutputStream);
// This is just to make sure there is at least something on the device.
// If the testsuite is running with an empty tempFS, it would return 0 and the assertion would fail.
for (int i = 0; i < 100; i++) {
out.println("Garbage " + i);
}
bufferedOutputStream.close();
final AtomicInteger over = new AtomicInteger(0);
final AtomicInteger under = new AtomicInteger(0);
final AtomicInteger tick = new AtomicInteger(0);
FileStoreMonitor.Callback callback = new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
tick.incrementAndGet();
System.out.println("tick:: " + store + " usage::" + usage);
}
@Override
public void over(FileStore store, double usage) {
over.incrementAndGet();
System.out.println("over:: " + store + " usage::" + usage);
}
@Override
public void under(FileStore store, double usage) {
under.incrementAndGet();
System.out.println("under:: " + store + " usage::" + usage);
}
};
final AtomicBoolean fakeReturn = new AtomicBoolean(false);
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 100, TimeUnit.MILLISECONDS, 0.999) {
@Override
protected double calculateUsage(FileStore store) throws IOException {
if (fakeReturn.get()) {
return 1f;
}
else {
return super.calculateUsage(store);
}
}
};
storeMonitor.addCallback(callback);
storeMonitor.addStore(getTestDirfile());
storeMonitor.tick();
Assert.assertEquals(0, over.get());
Assert.assertEquals(1, tick.get());
Assert.assertEquals(1, under.get());
fakeReturn.set(true);
storeMonitor.tick();
Assert.assertEquals(1, over.get());
Assert.assertEquals(2, tick.get());
Assert.assertEquals(1, under.get());
}
@Test
public void testScheduler() throws Exception {
FileStoreMonitor storeMonitor = new FileStoreMonitor(scheduledExecutorService, 20, TimeUnit.MILLISECONDS, 0.9);
final ReusableLatch latch = new ReusableLatch(5);
storeMonitor.addStore(getTestDirfile());
storeMonitor.addCallback(new FileStoreMonitor.Callback() {
@Override
public void tick(FileStore store, double usage) {
System.out.println("TickS::" + usage);
latch.countDown();
}
@Override
public void over(FileStore store, double usage) {
}
@Override
public void under(FileStore store, double usage) {
}
});
storeMonitor.start();
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
storeMonitor.stop();
latch.setCount(1);
Assert.assertFalse(latch.await(100, TimeUnit.MILLISECONDS));
// FileStoreMonitor monitor = new FileStoreMonitor()
}
}

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.group.impl.GroupBinding;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -299,6 +300,11 @@ public class TransactionImplTest extends ActiveMQTestBase {
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
@Override
public void confirmPendingLargeMessage(long recordID) throws Exception {

View File

@ -1420,8 +1420,14 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
}
protected final ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final long pageSize,
final long maxAddressSize) {
return createServer(realFiles, configuration, pageSize, maxAddressSize, (Map<String, AddressSettings>)null);
}
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final long pageSize,
final long maxAddressSize,
@ -1458,20 +1464,20 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected final ActiveMQServer createServer(final boolean realFiles, final boolean netty) throws Exception {
return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
return createServer(realFiles, createDefaultConfig(netty), AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected ActiveMQServer createServer(final boolean realFiles, final Configuration configuration) {
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected final ActiveMQServer createServer(final Configuration configuration) {
return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
return createServer(configuration.isPersistenceEnabled(), configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected ActiveMQServer createServer(final boolean realFiles, boolean isNetty, StoreConfiguration.StoreType storeType) throws Exception {
Configuration configuration = storeType == StoreConfiguration.StoreType.DATABASE ? createDefaultJDBCConfig(isNetty) : createDefaultConfig(isNetty);
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES, new HashMap<String, AddressSettings>());
return createServer(realFiles, configuration, AddressSettings.DEFAULT_PAGE_SIZE, AddressSettings.DEFAULT_MAX_SIZE_BYTES);
}
protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
@ -1559,7 +1565,7 @@ public abstract class ActiveMQTestBase extends Assert {
final boolean realFiles,
final Map<String, Object> params) throws Exception {
String acceptor = isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY;
return createServer(realFiles, createDefaultConfig(index, params, acceptor), -1, -1, new HashMap<String, AddressSettings>());
return createServer(realFiles, createDefaultConfig(index, params, acceptor), -1, -1);
}
protected ActiveMQServer createClusteredServerWithParams(final boolean isNetty,
@ -1568,7 +1574,7 @@ public abstract class ActiveMQTestBase extends Assert {
final int pageSize,
final int maxAddressSize,
final Map<String, Object> params) throws Exception {
return createServer(realFiles, createDefaultConfig(index, params, (isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY)), pageSize, maxAddressSize, new HashMap<String, AddressSettings>());
return createServer(realFiles, createDefaultConfig(index, params, (isNetty ? NETTY_ACCEPTOR_FACTORY : INVM_ACCEPTOR_FACTORY)), pageSize, maxAddressSize);
}
protected ServerLocator createFactory(final boolean isNetty) throws Exception {

View File

@ -53,6 +53,9 @@
<populate-validated-user>true</populate-validated-user>
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
<configuration-file-refresh-period>1234567</configuration-file-refresh-period>
<global-max-size>1234567</global-max-size>
<max-disk-usage>37</max-disk-usage>
<disk-scan-period>123</disk-scan-period>
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>

View File

@ -41,7 +41,9 @@ Name | Description
[create-bindings-dir](persistence.md "Configuring the bindings journal") | true means that the server will create the bindings directory on start up. Default=true
[create-journal-dir](persistence.md) | true means that the journal directory will be created. Default=true
[discovery-groups](clusters.md "Clusters") | [a list of discovery-group](#discovery-group-type)
[disk-scan-period](paging.md#max-disk-usage) | The interval where the disk is scanned for percentual usage. Default=5000 ms.
[diverts](diverts.md "Diverting and Splitting Message Flows") | [a list of diverts to use](#divert-type)
[global-max-size](paging.md#global-max-size) | The amount in bytes before all addresses are considered full
[graceful-shutdown-enabled](graceful-shutdown.md "Graceful Server Shutdown") | true means that graceful shutdown is enabled. Default=true
[graceful-shutdown-timeout](graceful-shutdown.md "Graceful Server Shutdown") | Timeout on waitin for clients to disconnect before server shutdown. Default=-1
[grouping-handler](message-grouping.md "Message Grouping") | Message Group configuration
@ -65,6 +67,7 @@ Name | Description
[management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications
[mask-password](configuration-index.md "Using Masked Passwords in Configuration Files") | This option controls whether passwords in server configuration need be masked. If set to "true" the passwords are masked. Default=false
[max-saved-replicated-journals-size]() | This specifies how many times a replicated backup server can restart after moving its files on start. Once there are this number of backup journal files the server will stop permanently after if fails back. Default=2
[max-disk-usage](paging.md#max-disk-usage) | The max percentage of data we should use from disks. The System will block while the disk is full. Default=100
[memory-measure-interval](perf-tuning.md) | frequency to sample JVM memory in ms (or -1 to disable memory sampling). Default=-1
[memory-warning-threshold](perf-tuning.md) | Percentage of available memory which will trigger a warning log. Default=25
[message-counter-enabled](management.md "Configuring Message Counters") | true means that message counters are enabled. Default=false

View File

@ -11,8 +11,7 @@ a low memory footprint.
Apache ActiveMQ Artemis will start paging messages to disk, when the size of all
messages in memory for an address exceeds a configured maximum size.
By default, Apache ActiveMQ Artemis does not page messages - this must be explicitly
configured to activate it.
The default configuration from Artemis has destinations with paging.
## Page Files
@ -121,6 +120,12 @@ This is the list of available parameters on the address settings.
</tbody>
</table>
## Global Max Size
Beyond the max-size-bytes on the address you can also set the global-max-size on the main configuration. If you set max-size-bytes = -1 on paging the global-max-size can still be used.
When you have more messages than what is configured global-max-size any new produced message will make that destination to go through its paging policy.
## Dropping messages
Instead of paging messages when the max size is reached, an address can
@ -181,6 +186,12 @@ In this example all the other 9 queues will be consuming messages from
the page system. This may cause performance issues if this is an
undesirable state.
## Max Disk Usage
The System will perform scans on the disk to determine if the disk is beyond a configured limit.
These are configured through 'max-disk-usage' in percentage. Once that limit is reached any
message will be blocked. (unless the protocol doesn't support flow control on which case there will be an exception thrown and the connection for those clients dropped).
## Example
See the [examples](examples.md) chapter for an example which shows how to use paging with Apache ActiveMQ Artemis.

View File

@ -588,7 +588,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
SessionCallback callback,
OperationContext context,
boolean autoCreateQueue) throws Exception {
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null, getPagingManager());
}
}

View File

@ -57,7 +57,7 @@ import org.junit.Test;
public class LargeMessageTest extends LargeMessageTestBase {
static final int RECEIVE_WAIT_TIME = 10000;
private static final int RECEIVE_WAIT_TIME = 10000;
private final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;

View File

@ -41,7 +41,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.BackupSyncDelay;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;

View File

@ -51,7 +51,7 @@ import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.impl.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
package org.apache.activemq.artemis.tests.integration.paging;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@ -49,11 +49,7 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
}
@Test
/**
* When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
*
* -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
*/ public void testBlockLogging() throws Exception {
public void testBlockLogging() throws Exception {
final int MAX_MESSAGES = 200;
final String MY_ADDRESS = "myAddress";
final String MY_QUEUE = "myQueue";
@ -64,6 +60,31 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.start();
internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
}
@Test
public void testGlobalBlockLogging() throws Exception {
final int MAX_MESSAGES = 200;
final String MY_ADDRESS = "myAddress";
final String MY_QUEUE = "myQueue";
ActiveMQServer server = createServer(false);
AddressSettings defaultSetting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
server.getConfiguration().setGlobalMaxSize(20 * 1024);
server.start();
internalTest(MAX_MESSAGES, MY_ADDRESS, MY_QUEUE, server);
}
private void internalTest(int MAX_MESSAGES,
String MY_ADDRESS,
String MY_QUEUE,
ActiveMQServer server) throws Exception {
ServerLocator locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
ClientSessionFactory factory = createSessionFactory(locator);

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.paging;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class GlobalPagingTest extends PagingTest {
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
@Override
protected ActiveMQServer createServer(final boolean realFiles,
final Configuration configuration,
final long pageSize,
final long maxAddressSize,
final Map<String, AddressSettings> settings) {
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(configuration, realFiles));
if (settings != null) {
for (Map.Entry<String, AddressSettings> setting : settings.entrySet()) {
server.getAddressSettingsRepository().addMatch(setting.getKey(), setting.getValue());
}
}
server.getConfiguration().setGlobalMaxSize(maxAddressSize);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(pageSize).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
return server;
}
@Test
public void testPagingOverFullDisk() throws Exception {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.getConfiguration().setGlobalMaxSize(-1);
server.start();
ActiveMQServerImpl serverImpl = (ActiveMQServerImpl)server;
serverImpl.getMonitor().stop(); // stop the scheduled executor, we will do it manually only
serverImpl.getMonitor().tick();
final int numberOfMessages = 500;
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
final ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
final ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
final byte[] body = new byte[MESSAGE_SIZE];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= MESSAGE_SIZE; j++) {
bb.put(getSamplebyte(j));
}
Queue queue = server.locateQueue(ADDRESS);
queue.getPageSubscription().getPagingStore().forceAnotherPage();
sendFewMessages(numberOfMessages, session, producer, body);
serverImpl.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
serverImpl.getMonitor().tick();
Thread t = new Thread() {
public void run() {
try {
sendFewMessages(numberOfMessages, session, producer, body);
}
catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
t.join(1000);
Assert.assertTrue(t.isAlive());
// releasing the disk
serverImpl.getMonitor().setMaxUsage(1).tick();
t.join(5000);
Assert.assertFalse(t.isAlive());
session.start();
assertEquals(numberOfMessages * 2, getMessageCount(queue));
// The consumer has to be created after the getMessageCount(queue) assertion
// otherwise delivery could alter the messagecount and give us a false failure
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
ClientMessage msg = null;
for (int i = 0; i < numberOfMessages * 2; i++) {
msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
if (i % 500 == 0) {
session.commit();
}
}
session.commit();
assertEquals(0, getMessageCount(queue));
}
protected void sendFewMessages(int numberOfMessages,
ClientSession session,
ClientProducer producer,
byte[] body) throws ActiveMQException {
ClientMessage message;
for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}
session.commit();
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
package org.apache.activemq.artemis.tests.integration.paging;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
package org.apache.activemq.artemis.tests.integration.paging;
import java.nio.ByteBuffer;
import java.util.HashMap;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
package org.apache.activemq.artemis.tests.integration.paging;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@ -88,18 +88,18 @@ public class PagingTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(PagingTest.class);
private ServerLocator locator;
private ActiveMQServer server;
private ClientSessionFactory sf;
protected ServerLocator locator;
protected ActiveMQServer server;
protected ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
protected static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
private static final int RECEIVE_TIMEOUT = 5000;
protected static final int RECEIVE_TIMEOUT = 5000;
private static final int PAGE_MAX = 100 * 1024;
protected static final int PAGE_MAX = 100 * 1024;
private static final int PAGE_SIZE = 10 * 1024;
protected static final int PAGE_SIZE = 10 * 1024;
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@ -118,11 +118,7 @@ public class PagingTest extends ActiveMQTestBase {
final int PAGE_SIZE = 10 * 1024;
HashMap<String, AddressSettings> map = new HashMap<>();
AddressSettings value = new AddressSettings();
map.put(ADDRESS.toString(), value);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.start();
final int numberOfBytes = 1024;
@ -158,7 +154,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
server.start();
sf = createSessionFactory(locator);
@ -171,7 +167,7 @@ public class PagingTest extends ActiveMQTestBase {
session.start();
for (int i = 0; i < 201; i++) {
ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
ClientMessage message2 = consumer.receive(10000);
Assert.assertNotNull(message2);
@ -186,7 +182,7 @@ public class PagingTest extends ActiveMQTestBase {
else {
session.rollback();
for (int i = 0; i < 100; i++) {
ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
ClientMessage message2 = consumer.receive(10000);
Assert.assertNotNull(message2);
@ -210,7 +206,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -330,7 +326,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -435,7 +431,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -629,7 +625,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0) // disable compact
.setMessageExpiryScanPeriod(500);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setExpiryAddress(new SimpleString("EXP")).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
@ -743,7 +739,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalDirectory(getJournalDir()).setJournalSyncNonTransactional(false).setJournalCompactMinFiles(0); // disable compact
ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -896,7 +892,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -951,7 +947,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
locator = createInVMNonHALocator();
@ -1000,7 +996,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
waitForServerToStart(server);
@ -1075,7 +1071,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.getAddressSettingsRepository().getMatch("#").setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
server.start();
@ -1145,7 +1141,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1195,7 +1191,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
locator = createInVMNonHALocator();
@ -1255,7 +1251,7 @@ public class PagingTest extends ActiveMQTestBase {
config.setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1305,7 +1301,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
locator = createInVMNonHALocator();
@ -1358,7 +1354,7 @@ public class PagingTest extends ActiveMQTestBase {
// a dumb user, or anything that will remove the data
deleteDirectory(new File(getPageDir()));
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
@ -1391,7 +1387,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
locator = createInVMNonHALocator();
@ -1434,7 +1430,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1511,7 +1507,7 @@ public class PagingTest extends ActiveMQTestBase {
jrn.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1548,7 +1544,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1629,7 +1625,7 @@ public class PagingTest extends ActiveMQTestBase {
}
}
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1678,7 +1674,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -1774,7 +1770,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
if (divert) {
DivertConfiguration divert1 = new DivertConfiguration().setName("dv1").setRoutingName("nm1").setAddress(PagingTest.ADDRESS.toString()).setForwardingAddress(PagingTest.ADDRESS.toString() + "-1").setExclusive(true);
@ -1876,7 +1872,7 @@ public class PagingTest extends ActiveMQTestBase {
locator.close();
}
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
@ -2011,7 +2007,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2067,7 +2063,7 @@ public class PagingTest extends ActiveMQTestBase {
locator.close();
}
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
ServerLocator locator1 = createInVMNonHALocator();
@ -2151,7 +2147,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2199,7 +2195,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
locator = createInVMNonHALocator();
@ -2264,7 +2260,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2374,7 +2370,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2402,6 +2398,7 @@ public class PagingTest extends ActiveMQTestBase {
int numberOfMessages = 0;
while (true) {
System.out.println("Sending message " + numberOfMessages);
message = session.createMessage(IS_DURABLE_MESSAGE);
message.getBodyBuffer().writeBytes(body);
message.putIntProperty("id", numberOfMessages);
@ -2485,7 +2482,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2577,7 +2574,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalSyncTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2668,7 +2665,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalSyncTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE * 2, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE * 2);
server.getConfiguration();
server.getConfiguration();
@ -2777,7 +2774,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2825,7 +2822,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
sf = createSessionFactory(locator);
@ -2873,7 +2870,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -2924,7 +2921,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3167,7 +3164,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3210,7 +3207,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3240,7 +3237,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3277,7 +3274,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3305,7 +3302,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3535,7 +3532,7 @@ public class PagingTest extends ActiveMQTestBase {
int NUMBER_OF_MESSAGES = 2;
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3569,7 +3566,7 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
sf = createSessionFactory(locator);
@ -3610,7 +3607,7 @@ public class PagingTest extends ActiveMQTestBase {
public void testSyncPage() throws Exception {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3653,7 +3650,7 @@ public class PagingTest extends ActiveMQTestBase {
public void testSyncPageTX() throws Exception {
Configuration config = createDefaultInVMConfig();
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -3878,7 +3875,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setJournalFileSize(10 * 1024 * 1024);
server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
server = createServer(true, config, 512 * 1024, 1024 * 1024);
server.start();
@ -3980,7 +3977,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -4062,7 +4059,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -4158,7 +4155,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -4235,7 +4232,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -4959,7 +4956,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5020,7 +5017,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5198,7 +5195,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5292,7 +5289,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5376,7 +5373,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5460,7 +5457,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5533,7 +5530,7 @@ public class PagingTest extends ActiveMQTestBase {
public void testNoCursors() throws Exception {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();
@ -5570,7 +5567,7 @@ public class PagingTest extends ActiveMQTestBase {
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
server.start();

View File

@ -40,6 +40,8 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
@ -100,6 +102,46 @@ public class StompTest extends StompTestBase {
assertTrue(latch.await(60, TimeUnit.SECONDS));
}
@Test
public void testSendOverDiskFull() throws Exception {
AssertionLoggerHandler.startCapture();
try {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
sendFrame(frame);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
latch.countDown();
}
});
((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor().setMaxUsage(0).tick();
frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
for (int i = 1; i <= count; i++) {
// Thread.sleep(1);
// System.out.println(">>> " + i);
sendFrame(frame);
}
// It should encounter the exception on logs
AssertionLoggerHandler.findText("AMQ119119");
}
finally {
AssertionLoggerHandler.clear();
AssertionLoggerHandler.stopCapture();
}
}
@Test
public void testConnect() throws Exception {

View File

@ -205,7 +205,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
config.addAcceptorConfiguration(allTransport);
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));

View File

@ -448,5 +448,10 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
public boolean isStarted() {
return false;
}
@Override
public boolean checkReleasedMemory() {
return true;
}
}
}

View File

@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -826,6 +827,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
public void stop() throws InterruptedException {
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
public void beforePageRead() throws Exception {
}

View File

@ -23,39 +23,25 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
public final class FakePagingManager implements PagingManager {
public void activate() {
}
@Override
public void addBlockedStore(PagingStore store) {
public long addSize(final long size) {
return 0;
}
@Override
public void addTransaction(final PageTransactionInfo pageTransaction) {
}
public PagingStore createPageStore(final SimpleString destination) throws Exception {
return null;
}
public long getTotalMemory() {
return 0;
}
@Override
public SimpleString[] getStoreNames() {
return null;
}
public long getMaxMemory() {
return 0;
}
@Override
public PagingStore getPageStore(final SimpleString address) throws Exception {
return null;
@ -74,10 +60,6 @@ public final class FakePagingManager implements PagingManager {
return false;
}
public boolean isGlobalPageMode() {
return false;
}
public boolean isPaging(final SimpleString destination) throws Exception {
return false;
}
@ -92,6 +74,11 @@ public final class FakePagingManager implements PagingManager {
return false;
}
@Override
public FakePagingManager addSize(int size) {
return this;
}
@Override
public void reloadStores() throws Exception {
}
@ -101,13 +88,9 @@ public final class FakePagingManager implements PagingManager {
}
public void setGlobalPageMode(final boolean globalMode) {
}
public void setPostOffice(final PostOffice postOffice) {
}
public void resumeDepages() {
@Override
public boolean isUsingGlobalSize() {
return false;
}
public void sync(final Collection<SimpleString> destinationsToSync) throws Exception {
@ -126,10 +109,15 @@ public final class FakePagingManager implements PagingManager {
public void stop() throws Exception {
}
@Override
public boolean isDiskFull() {
return false;
}
/*
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
*/
* (non-Javadoc)
* @see org.apache.activemq.artemis.core.paging.PagingManager#isGlobalFull()
*/
public boolean isGlobalFull() {
return false;
}
@ -177,4 +165,8 @@ public final class FakePagingManager implements PagingManager {
// no-op
}
@Override
public void injectMonitor(FileStoreMonitor monitor) throws Exception {
}
}