This commit is contained in:
Clebert Suconic 2017-06-29 12:56:16 -04:00
commit 43a7890b1a
5 changed files with 46 additions and 2 deletions

View File

@ -304,7 +304,7 @@ public final class ActiveMQDefaultConfiguration {
private static boolean DEFAULT_BRIDGE_DUPLICATE_DETECTION = true;
// Once the bridge has received this many bytes, it sends a confirmation
private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1048576;
private static int DEFAULT_BRIDGE_CONFIRMATION_WINDOW_SIZE = 1024 * 1024 * 10;
// Producer flow control
private static int DEFAULT_BRIDGE_PRODUCER_WINDOW_SIZE = -1;
@ -348,7 +348,7 @@ public final class ActiveMQDefaultConfiguration {
private static int DEFAULT_CLUSTER_MAX_HOPS = 1;
// The size (in bytes) of the window used for confirming data from the server connected to.
private static int DEFAULT_CLUSTER_CONFIRMATION_WINDOW_SIZE = 1048576;
private static int DEFAULT_CLUSTER_CONFIRMATION_WINDOW_SIZE = 1024 * 1024 * 10;
// How long to wait for a reply if in the middle of a fail-over. -1 means wait forever.
private static long DEFAULT_CLUSTER_CALL_FAILOVER_TIMEOUT = -1;

View File

@ -79,6 +79,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
private static final int DEFAULT_JMS_MESSAGE_SIZE = 1864;
private static final int RANGE_SIZE_MIN = 0;
private static final int RANGE_SZIE_MAX = 4;
private static final long serialVersionUID = 4077088945050267843L;
// Attributes -----------------------------------------------------------------------------
@ -2058,6 +2064,26 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public static boolean checkoutDupCacheSize(final int windowSize, final int idCacheSize) {
final int msgNumInFlight = windowSize / DEFAULT_JMS_MESSAGE_SIZE;
if (msgNumInFlight == 0) {
return true;
}
boolean sizeGood = false;
if (idCacheSize >= msgNumInFlight) {
int r = idCacheSize / msgNumInFlight;
// This setting is here to accomodate the current default setting.
if ( (r >= RANGE_SIZE_MIN) && (r <= RANGE_SZIE_MAX)) {
sizeGood = true;
}
}
return sizeGood;
}
/**
* It will find the right location of a subFolder, related to artemisInstance
*/

View File

@ -1590,4 +1590,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT)
void undeployQueue(SimpleString queueName);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT)
void duplicateCacheSizeWarning(int idCacheSize, int confirmationWindowSize);
}

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@ -821,6 +822,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
if (start) {
bridge.start();
}
if ( !ConfigurationImpl.checkoutDupCacheSize(serverLocator.getConfirmationWindowSize(),server.getConfiguration().getIDCacheSize())) {
ActiveMQServerLogger.LOGGER.duplicateCacheSizeWarning(server.getConfiguration().getIDCacheSize(), serverLocator.getConfirmationWindowSize());
}
}
// Inner classes -----------------------------------------------------------------------------------

View File

@ -634,6 +634,15 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertTrue(brokerPlugins.get(1) instanceof EmptyPlugin2);
}
@Test
public void testDefaultConstraints() {
int defaultConfirmationWinSize = ActiveMQDefaultConfiguration.getDefaultClusterConfirmationWindowSize();
int defaultIdCacheSize = ActiveMQDefaultConfiguration.getDefaultIdCacheSize();
assertTrue("check failed, " + defaultConfirmationWinSize + ":" + defaultIdCacheSize, ConfigurationImpl.checkoutDupCacheSize(defaultConfirmationWinSize, defaultIdCacheSize));
defaultConfirmationWinSize = ActiveMQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize();
assertTrue("check failed, " + defaultConfirmationWinSize + ":" + defaultIdCacheSize, ConfigurationImpl.checkoutDupCacheSize(defaultConfirmationWinSize, defaultIdCacheSize));
}
@Override
protected Configuration createConfiguration() throws Exception {
FileConfiguration fc = new FileConfiguration();