This closes #1180
This commit is contained in:
commit
f5738243fd
|
@ -101,6 +101,9 @@ public class Create extends InputAbstract {
|
||||||
public static final String ETC_PING_TXT = "etc/ping-settings.txt";
|
public static final String ETC_PING_TXT = "etc/ping-settings.txt";
|
||||||
public static final String ETC_COMMENTED_PING_TXT = "etc/commented-ping-settings.txt";
|
public static final String ETC_COMMENTED_PING_TXT = "etc/commented-ping-settings.txt";
|
||||||
|
|
||||||
|
public static final String ETC_GLOBAL_MAX_SPECIFIED_TXT = "etc/global-max-specified.txt";
|
||||||
|
public static final String ETC_GLOBAL_MAX_DEFAULT_TXT = "etc/global-max-default.txt";
|
||||||
|
|
||||||
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
|
@Arguments(description = "The instance directory to hold the broker's configuration and data. Path must be writable.", required = true)
|
||||||
File directory;
|
File directory;
|
||||||
|
|
||||||
|
@ -251,8 +254,8 @@ public class Create extends InputAbstract {
|
||||||
@Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
|
@Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
|
||||||
boolean noJournalSync;
|
boolean noJournalSync;
|
||||||
|
|
||||||
@Option(name = "--global-max-size", description = "Maximum amount of memory which message data may consume (Default: 100Mb)")
|
@Option(name = "--global-max-size", description = "Maximum amount of memory which message data may consume (Default: Undefined, half of the system's memory)")
|
||||||
String globalMaxSize = "100Mb";
|
String globalMaxSize;
|
||||||
|
|
||||||
boolean IS_WINDOWS;
|
boolean IS_WINDOWS;
|
||||||
|
|
||||||
|
@ -661,7 +664,15 @@ public class Create extends InputAbstract {
|
||||||
filters.put("${user}", getUser());
|
filters.put("${user}", getUser());
|
||||||
filters.put("${password}", getPassword());
|
filters.put("${password}", getPassword());
|
||||||
filters.put("${role}", role);
|
filters.put("${role}", role);
|
||||||
|
|
||||||
|
|
||||||
|
if (globalMaxSize == null || globalMaxSize.trim().equals("")) {
|
||||||
|
filters.put("${global-max-section}", readTextFile(ETC_GLOBAL_MAX_DEFAULT_TXT));
|
||||||
|
} else {
|
||||||
filters.put("${global-max-size}", globalMaxSize);
|
filters.put("${global-max-size}", globalMaxSize);
|
||||||
|
filters.put("${global-max-section}", applyFilters(readTextFile(ETC_GLOBAL_MAX_SPECIFIED_TXT), filters));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (clustered) {
|
if (clustered) {
|
||||||
filters.put("${host}", getHostForClustered());
|
filters.put("${host}", getHostForClustered());
|
||||||
|
|
|
@ -25,7 +25,6 @@ import io.airlift.airline.Option;
|
||||||
import org.apache.activemq.artemis.cli.Artemis;
|
import org.apache.activemq.artemis.cli.Artemis;
|
||||||
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
|
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
|
||||||
import org.apache.activemq.artemis.components.ExternalComponent;
|
import org.apache.activemq.artemis.components.ExternalComponent;
|
||||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
|
||||||
import org.apache.activemq.artemis.dto.BrokerDTO;
|
import org.apache.activemq.artemis.dto.BrokerDTO;
|
||||||
import org.apache.activemq.artemis.dto.ComponentDTO;
|
import org.apache.activemq.artemis.dto.ComponentDTO;
|
||||||
import org.apache.activemq.artemis.factory.BrokerFactory;
|
import org.apache.activemq.artemis.factory.BrokerFactory;
|
||||||
|
@ -61,12 +60,8 @@ public class Run extends LockAbstract {
|
||||||
public Object execute(ActionContext context) throws Exception {
|
public Object execute(ActionContext context) throws Exception {
|
||||||
super.execute(context);
|
super.execute(context);
|
||||||
|
|
||||||
FileConfiguration fileConfiguration = getFileConfiguration();
|
|
||||||
|
|
||||||
Artemis.printBanner();
|
Artemis.printBanner();
|
||||||
|
|
||||||
createDirectories(getFileConfiguration());
|
|
||||||
|
|
||||||
BrokerDTO broker = getBrokerDTO();
|
BrokerDTO broker = getBrokerDTO();
|
||||||
|
|
||||||
addShutdownHook(broker.server.getConfigurationFile().getParentFile());
|
addShutdownHook(broker.server.getConfigurationFile().getParentFile());
|
||||||
|
@ -91,13 +86,6 @@ public class Run extends LockAbstract {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createDirectories(FileConfiguration fileConfiguration) {
|
|
||||||
fileConfiguration.getPagingLocation().mkdirs();
|
|
||||||
fileConfiguration.getJournalLocation().mkdirs();
|
|
||||||
fileConfiguration.getBindingsLocation().mkdirs();
|
|
||||||
fileConfiguration.getLargeMessagesLocation().mkdirs();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a simple shutdown hook to stop the server.
|
* Add a simple shutdown hook to stop the server.
|
||||||
*
|
*
|
||||||
|
|
|
@ -67,6 +67,8 @@ public class FileBroker implements Broker {
|
||||||
fileDeploymentManager.addDeployable(configuration).addDeployable(jmsConfiguration);
|
fileDeploymentManager.addDeployable(configuration).addDeployable(jmsConfiguration);
|
||||||
fileDeploymentManager.readConfiguration();
|
fileDeploymentManager.readConfiguration();
|
||||||
|
|
||||||
|
createDirectories(configuration);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a bit of a hack for backwards config compatibility since we no longer want to start the broker
|
* This is a bit of a hack for backwards config compatibility since we no longer want to start the broker
|
||||||
* using the JMSServerManager which would normally deploy JMS destinations. Here we take the JMS destination
|
* using the JMSServerManager which would normally deploy JMS destinations. Here we take the JMS destination
|
||||||
|
@ -112,6 +114,14 @@ public class FileBroker implements Broker {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void createDirectories(FileConfiguration fileConfiguration) {
|
||||||
|
fileConfiguration.getPagingLocation().mkdirs();
|
||||||
|
fileConfiguration.getJournalLocation().mkdirs();
|
||||||
|
fileConfiguration.getBindingsLocation().mkdirs();
|
||||||
|
fileConfiguration.getLargeMessagesLocation().mkdirs();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
stop(false);
|
stop(false);
|
||||||
|
|
|
@ -56,10 +56,7 @@ ${ping-config.settings}${journal-buffer.settings}${connector-config.settings}
|
||||||
that won't support flow control. -->
|
that won't support flow control. -->
|
||||||
<max-disk-usage>90</max-disk-usage>
|
<max-disk-usage>90</max-disk-usage>
|
||||||
|
|
||||||
<!-- the system will enter into page mode once you hit this limit.
|
${global-max-section}
|
||||||
This is an estimate in bytes of how much the messages are using in memory -->
|
|
||||||
<global-max-size>${global-max-size}</global-max-size>
|
|
||||||
|
|
||||||
<acceptors>
|
<acceptors>
|
||||||
|
|
||||||
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
<!-- the system will enter into page mode once you hit this limit.
|
||||||
|
This is an estimate in bytes of how much the messages are using in memory
|
||||||
|
|
||||||
|
The system will use half of the available memory (-Xmx) by default for the global-max-size.
|
||||||
|
You may specify a different value here if you need to customize it to your needs.
|
||||||
|
|
||||||
|
<global-max-size>100Mb</global-max-size>
|
||||||
|
|
||||||
|
-->
|
|
@ -0,0 +1,4 @@
|
||||||
|
<!-- the system will enter into page mode once you hit this limit.
|
||||||
|
This is an estimate in bytes of how much the messages are using in memory -->
|
||||||
|
<!--
|
||||||
|
<global-max-size>${global-max-size}</global-max-size>
|
|
@ -56,6 +56,8 @@ public class StreamClassPathTest {
|
||||||
openStream(Create.ETC_STOMP_ACCEPTOR_TXT);
|
openStream(Create.ETC_STOMP_ACCEPTOR_TXT);
|
||||||
openStream(Create.ETC_PING_TXT);
|
openStream(Create.ETC_PING_TXT);
|
||||||
openStream(Create.ETC_COMMENTED_PING_TXT);
|
openStream(Create.ETC_COMMENTED_PING_TXT);
|
||||||
|
openStream(Create.ETC_GLOBAL_MAX_SPECIFIED_TXT);
|
||||||
|
openStream(Create.ETC_GLOBAL_MAX_DEFAULT_TXT);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -435,7 +435,7 @@ public final class ActiveMQDefaultConfiguration {
|
||||||
// Default period to wait between configuration file checks
|
// Default period to wait between configuration file checks
|
||||||
public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
|
public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
|
||||||
|
|
||||||
public static final long DEFAULT_GLOBAL_MAX_SIZE = -1;
|
public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2;
|
||||||
|
|
||||||
public static final int DEFAULT_MAX_DISK_USAGE = 100;
|
public static final int DEFAULT_MAX_DISK_USAGE = 100;
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,10 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(ConfigurationImpl.class);
|
private static final Logger logger = Logger.getLogger(ConfigurationImpl.class);
|
||||||
|
|
||||||
|
// We want to turn of a few log.infos from the testsuite as they would be too verbose for tests
|
||||||
|
// Only the testsuite should set this one up
|
||||||
|
public static boolean TEST_MODE = false;
|
||||||
|
|
||||||
public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
|
public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
|
||||||
|
|
||||||
private static final long serialVersionUID = 4077088945050267843L;
|
private static final long serialVersionUID = 4077088945050267843L;
|
||||||
|
@ -253,7 +257,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
private long configurationFileRefreshPeriod = ActiveMQDefaultConfiguration.getDefaultConfigurationFileRefreshPeriod();
|
private long configurationFileRefreshPeriod = ActiveMQDefaultConfiguration.getDefaultConfigurationFileRefreshPeriod();
|
||||||
|
|
||||||
private long globalMaxSize = ActiveMQDefaultConfiguration.getDefaultMaxGlobalSize();
|
private Long globalMaxSize;
|
||||||
|
|
||||||
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
|
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
|
||||||
|
|
||||||
|
@ -351,6 +355,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getGlobalMaxSize() {
|
public long getGlobalMaxSize() {
|
||||||
|
if (globalMaxSize == null) {
|
||||||
|
this.globalMaxSize = ActiveMQDefaultConfiguration.getDefaultMaxGlobalSize();
|
||||||
|
if (!TEST_MODE) {
|
||||||
|
ActiveMQServerLogger.LOGGER.usingDefaultPaging(globalMaxSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
return globalMaxSize;
|
return globalMaxSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1792,7 +1802,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
||||||
if (journalDatasync != other.journalDatasync) {
|
if (journalDatasync != other.journalDatasync) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (globalMaxSize != other.globalMaxSize) {
|
if (globalMaxSize != null && !globalMaxSize.equals(other.globalMaxSize)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (maxDiskUsage != other.maxDiskUsage) {
|
if (maxDiskUsage != other.maxDiskUsage) {
|
||||||
|
|
|
@ -332,6 +332,13 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
||||||
format = Message.Format.MESSAGE_FORMAT)
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
void reloadingConfiguration(String module);
|
void reloadingConfiguration(String module);
|
||||||
|
|
||||||
|
@LogMessage(level = Logger.Level.INFO)
|
||||||
|
@Message(id = 221057, value = "Global Max Size is being adjusted to 1/2 of the JVM max size (-Xmx). being defined as {0}",
|
||||||
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
void usingDefaultPaging(long bytes);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@LogMessage(level = Logger.Level.WARN)
|
@LogMessage(level = Logger.Level.WARN)
|
||||||
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
|
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
|
||||||
format = Message.Format.MESSAGE_FORMAT)
|
format = Message.Format.MESSAGE_FORMAT)
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
|
||||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||||
|
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||||
import org.apache.activemq.artemis.core.filter.Filter;
|
import org.apache.activemq.artemis.core.filter.Filter;
|
||||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||||
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
|
||||||
|
@ -3121,6 +3122,7 @@ public class QueueImpl implements Queue {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkDeadLetterAddressAndExpiryAddress(final AddressSettings settings) {
|
private void checkDeadLetterAddressAndExpiryAddress(final AddressSettings settings) {
|
||||||
|
if (!ConfigurationImpl.TEST_MODE) {
|
||||||
if (settings.getDeadLetterAddress() == null) {
|
if (settings.getDeadLetterAddress() == null) {
|
||||||
ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
|
ActiveMQServerLogger.LOGGER.AddressSettingsNoDLA(name);
|
||||||
}
|
}
|
||||||
|
@ -3128,6 +3130,7 @@ public class QueueImpl implements Queue {
|
||||||
ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
|
ActiveMQServerLogger.LOGGER.AddressSettingsNoExpiryAddress(name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final class SlowConsumerReaperRunnable implements Runnable {
|
private final class SlowConsumerReaperRunnable implements Runnable {
|
||||||
|
|
||||||
|
|
|
@ -157,6 +157,10 @@ import org.junit.runner.Description;
|
||||||
*/
|
*/
|
||||||
public abstract class ActiveMQTestBase extends Assert {
|
public abstract class ActiveMQTestBase extends Assert {
|
||||||
|
|
||||||
|
static {
|
||||||
|
ConfigurationImpl.TEST_MODE = true;
|
||||||
|
}
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(ActiveMQTestBase.class);
|
private static final Logger logger = Logger.getLogger(ActiveMQTestBase.class);
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
|
|
@ -61,7 +61,7 @@ Name | Description
|
||||||
[discovery-groups](clusters.md "Clusters") | [a list of discovery-group](#discovery-group-type)
|
[discovery-groups](clusters.md "Clusters") | [a list of discovery-group](#discovery-group-type)
|
||||||
[disk-scan-period](paging.md#max-disk-usage) | The interval where the disk is scanned for percentual usage. Default=5000 ms.
|
[disk-scan-period](paging.md#max-disk-usage) | The interval where the disk is scanned for percentual usage. Default=5000 ms.
|
||||||
[diverts](diverts.md "Diverting and Splitting Message Flows") | [a list of diverts to use](#divert-type)
|
[diverts](diverts.md "Diverting and Splitting Message Flows") | [a list of diverts to use](#divert-type)
|
||||||
[global-max-size](paging.md#global-max-size) | The amount in bytes before all addresses are considered full
|
[global-max-size](paging.md#global-max-size) | The amount in bytes before all addresses are considered full. Default is half of the memory used by the JVM (-Xmx argument).
|
||||||
[graceful-shutdown-enabled](graceful-shutdown.md "Graceful Server Shutdown") | true means that graceful shutdown is enabled. Default=true
|
[graceful-shutdown-enabled](graceful-shutdown.md "Graceful Server Shutdown") | true means that graceful shutdown is enabled. Default=true
|
||||||
[graceful-shutdown-timeout](graceful-shutdown.md "Graceful Server Shutdown") | Timeout on waitin for clients to disconnect before server shutdown. Default=-1
|
[graceful-shutdown-timeout](graceful-shutdown.md "Graceful Server Shutdown") | Timeout on waitin for clients to disconnect before server shutdown. Default=-1
|
||||||
[grouping-handler](message-grouping.md "Message Grouping") | Message Group configuration
|
[grouping-handler](message-grouping.md "Message Grouping") | Message Group configuration
|
||||||
|
|
|
@ -126,6 +126,8 @@ Beyond the max-size-bytes on the address you can also set the global-max-size on
|
||||||
|
|
||||||
When you have more messages than what is configured global-max-size any new produced message will make that destination to go through its paging policy.
|
When you have more messages than what is configured global-max-size any new produced message will make that destination to go through its paging policy.
|
||||||
|
|
||||||
|
global-max-size is calculated as half of the max memory available to the Java Virtual Machine, unless specified on the broker.xml configuration.
|
||||||
|
|
||||||
## Dropping messages
|
## Dropping messages
|
||||||
|
|
||||||
Instead of paging messages when the max size is reached, an address can
|
Instead of paging messages when the max size is reached, an address can
|
||||||
|
|
Loading…
Reference in New Issue