diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java index dbc1e890df..414def33de 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java @@ -147,6 +147,8 @@ public interface ClientSession extends XAResource, AutoCloseable { Boolean isExclusive(); Boolean isLastValue(); + + Integer getDefaultConsumerWindowSize(); } // Lifecycle operations ------------------------------------------ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java index 8dc35a90f3..d377d182d3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java @@ -16,9 +16,9 @@ */ package org.apache.activemq.artemis.core.client.impl; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientSession; -import org.apache.activemq.artemis.api.core.RoutingType; public class QueueQueryImpl implements ClientSession.QueueQuery { @@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { private final Boolean lastValue; + private final Integer defaultConsumerWindowSize; + public QueueQueryImpl(final boolean durable, final boolean temporary, final int consumerCount, @@ -88,7 +90,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final boolean autoCreated, final boolean purgeOnNoConsumers, final RoutingType routingType) { - this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null); + this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null, null); } public QueueQueryImpl(final boolean durable, final boolean temporary, @@ -104,7 +106,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { final boolean purgeOnNoConsumers, final RoutingType routingType, final Boolean exclusive, - final Boolean lastValue) { + final Boolean lastValue, + final Integer defaultConsumerWindowSize) { this.durable = durable; this.temporary = temporary; this.consumerCount = consumerCount; @@ -120,6 +123,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { this.routingType = routingType; this.exclusive = exclusive; this.lastValue = lastValue; + this.defaultConsumerWindowSize = defaultConsumerWindowSize; } @Override @@ -197,5 +201,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery { return lastValue; } + @Override + public Integer getDefaultConsumerWindowSize() { + return defaultConsumerWindowSize; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index fccb041143..1268bbac40 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -16,9 +16,12 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; +import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; + import java.security.AccessController; import java.security.PrivilegedAction; import java.util.EnumSet; @@ -29,6 +32,10 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -37,6 +44,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; @@ -85,6 +93,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage; @@ -115,12 +124,6 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG; - public class ActiveMQSessionContext extends SessionContext { private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class); @@ -324,8 +327,9 @@ public class ActiveMQSessionContext extends SessionContext { // The actual windows size that gets used is determined by the user since // could be overridden on the queue settings // The value we send is just a hint + final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize; - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } @Override @@ -822,6 +826,16 @@ public class ActiveMQSessionContext extends SessionContext { } } + @Override + public int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException { + if (response instanceof SessionQueueQueryResponseMessage_V3) { + final Integer defaultConsumerWindowSize = ((SessionQueueQueryResponseMessage_V3) response).getDefaultConsumerWindowSize(); + return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + } else { + return ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + } + } + private Channel getCreateChannel() { return getCoreConnection().getChannel(1, -1); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java index b982744e04..0c4c40fd0a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java @@ -38,12 +38,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon protected Boolean lastValue; + protected Integer defaultConsumerWindowSize; + public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) { - this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue()); + this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue(), result.getDefaultConsumerWindowSize()); } public SessionQueueQueryResponseMessage_V3() { - this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null); + this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null, null); } private SessionQueueQueryResponseMessage_V3(final SimpleString name, @@ -60,7 +62,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon final RoutingType routingType, final int maxConsumers, final Boolean exclusive, - final Boolean lastValue) { + final Boolean lastValue, + final Integer defaultConsumerWindowSize) { super(SESS_QUEUEQUERY_RESP_V3); this.durable = durable; @@ -92,6 +95,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon this.exclusive = exclusive; this.lastValue = lastValue; + + this.defaultConsumerWindowSize = defaultConsumerWindowSize; } public boolean isAutoCreated() { @@ -142,6 +147,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon this.lastValue = lastValue; } + public Integer getDefaultConsumerWindowSize() { + return defaultConsumerWindowSize; + } + + public void setDefaultConsumerWindowSize(Integer defaultConsumerWindowSize) { + this.defaultConsumerWindowSize = defaultConsumerWindowSize; + } + @Override public void encodeRest(final ActiveMQBuffer buffer) { super.encodeRest(buffer); @@ -151,6 +164,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon buffer.writeInt(maxConsumers); BufferHelper.writeNullableBoolean(buffer, exclusive); BufferHelper.writeNullableBoolean(buffer, lastValue); + BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize); } @Override @@ -164,6 +178,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon exclusive = BufferHelper.readNullableBoolean(buffer); lastValue = BufferHelper.readNullableBoolean(buffer); } + if (buffer.readableBytes() > 0) { + defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer); + } } @Override @@ -176,6 +193,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon result = prime * result + maxConsumers; result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237); result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237); + result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode()); return result; } @@ -195,12 +213,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon buff.append(", maxConsumers=" + maxConsumers); buff.append(", exclusive=" + exclusive); buff.append(", lastValue=" + lastValue); + buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize); return buff.toString(); } @Override public ClientSession.QueueQuery toQueueQuery() { - return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue()); + return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue(), getDefaultConsumerWindowSize()); } @Override @@ -226,6 +245,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon return false; } else if (!lastValue.equals(other.lastValue)) return false; + if (defaultConsumerWindowSize == null) { + if (other.defaultConsumerWindowSize != null) + return false; + } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize)) + return false; if (routingType == null) { if (other.routingType != null) return false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index 6497e3f99a..b09d310142 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -51,6 +51,8 @@ public class QueueQueryResult { private Boolean lastValue; + private Integer defaultConsumerWindowSize; + public QueueQueryResult(final SimpleString name, final SimpleString address, final boolean durable, @@ -65,7 +67,8 @@ public class QueueQueryResult { final RoutingType routingType, final int maxConsumers, final Boolean exclusive, - final Boolean lastValue) { + final Boolean lastValue, + final Integer defaultConsumerWindowSize) { this.durable = durable; this.temporary = temporary; @@ -95,6 +98,8 @@ public class QueueQueryResult { this.exclusive = exclusive; this.lastValue = lastValue; + + this.defaultConsumerWindowSize = defaultConsumerWindowSize; } public boolean isExists() { @@ -160,4 +165,8 @@ public class QueueQueryResult { public Boolean isLastValue() { return lastValue; } + + public Integer getDefaultConsumerWindowSize() { + return defaultConsumerWindowSize; + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 65713358d6..0d86354dd3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -325,6 +326,8 @@ public abstract class SessionContext { public abstract void resetMetadata(HashMap metaDataToSend); + public abstract int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException; + // Failover utility classes /** diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6c6a94cce8..dc5ba16320 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -241,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming"; + private static final String DEFAULT_CONSUMER_WINDOW_SIZE = "default-consumer-window-size"; + // Attributes ---------------------------------------------------- @@ -1068,6 +1070,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value); RoutingType routingType = RoutingType.valueOf(value); addressSettings.setDefaultAddressRoutingType(routingType); + } else if (DEFAULT_CONSUMER_WINDOW_SIZE.equalsIgnoreCase(name)) { + addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child)); } } return setting; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 8c2f74885d..6d720886ce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -358,4 +358,6 @@ public interface ServerSession extends SecurityAuth { int getConsumerCount(); int getProducerCount(); + + int getDefaultConsumerWindowSize(SimpleString address); } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index a2cdf55b99..f01e0c7639 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -865,11 +865,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull(); } - boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues(); - boolean defaultPurgeOnNoConsumers = getAddressSettingsRepository().getMatch(name.toString()).isDefaultPurgeOnNoConsumers(); - int defaultMaxConsumers = getAddressSettingsRepository().getMatch(name.toString()).getDefaultMaxConsumers(); - boolean defaultExclusiveQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultExclusiveQueue(); - boolean defaultLastValueQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultLastValueQueue(); + final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString()); + + boolean autoCreateQueues = addressSettings.isAutoCreateQueues(); + boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers(); + int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers(); + boolean defaultExclusiveQueue = addressSettings.isDefaultExclusiveQueue(); + boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue(); + int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize(); QueueQueryResult response; @@ -884,14 +887,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { SimpleString filterString = filter == null ? null : filter.getFilterString(); - response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue()); + response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), defaultConsumerWindowSize); } else if (name.equals(managementAddress)) { // make an exception for the management address (see HORNETQ-29) - response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false); + response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, defaultConsumerWindowSize); } else if (autoCreateQueues) { - response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue); + response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultConsumerWindowSize); } else { - response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null); + response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null, defaultConsumerWindowSize); } return response; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 7388e568e2..c0bca6b666 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1904,4 +1904,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener { public int getProducerCount() { return getServerProducers().size(); } + + @Override + public int getDefaultConsumerWindowSize(SimpleString address) { + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return as.getDefaultConsumerWindowSize(); + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 6fc5019c17..f2eb488ae8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -21,6 +21,7 @@ import java.io.Serializable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.journal.EncodingSupport; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.settings.Mergeable; @@ -178,6 +179,8 @@ public class AddressSettings implements Mergeable, Serializable private RoutingType defaultAddressRoutingType = null; + private Integer defaultConsumerWindowSize = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -222,6 +225,7 @@ public class AddressSettings implements Mergeable, Serializable this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch; this.defaultQueueRoutingType = other.defaultQueueRoutingType; this.defaultAddressRoutingType = other.defaultAddressRoutingType; + this.defaultConsumerWindowSize = other.defaultConsumerWindowSize; } public AddressSettings() { @@ -579,6 +583,21 @@ public class AddressSettings implements Mergeable, Serializable return this; } + /** + * @return the defaultConsumerWindowSize + */ + public int getDefaultConsumerWindowSize() { + return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE; + } + + /** + * @param defaultConsumerWindowSize the defaultConsumerWindowSize to set + */ + public AddressSettings setDefaultConsumerWindowSize(int defaultConsumerWindowSize) { + this.defaultConsumerWindowSize = defaultConsumerWindowSize; + return this; + } + /** * merge 2 objects in to 1 * @@ -694,6 +713,9 @@ public class AddressSettings implements Mergeable, Serializable if (defaultExclusiveQueue == null) { defaultExclusiveQueue = merged.defaultExclusiveQueue; } + if (defaultConsumerWindowSize == null) { + defaultConsumerWindowSize = merged.defaultConsumerWindowSize; + } if (defaultLastValueQueue == null) { defaultLastValueQueue = merged.defaultLastValueQueue; } @@ -811,6 +833,10 @@ public class AddressSettings implements Mergeable, Serializable if (buffer.readableBytes() > 0) { defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer); } + + if (buffer.readableBytes() > 0) { + defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer); + } } @Override @@ -851,7 +877,8 @@ public class AddressSettings implements Mergeable, Serializable DataConstants.SIZE_BYTE + BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) + BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) + - BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch); + BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch) + + BufferHelper.sizeOfNullableInteger(defaultConsumerWindowSize); } @Override @@ -932,6 +959,8 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch); + BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize); + } /* (non-Javadoc) @@ -980,6 +1009,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode()); result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode()); result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode()); + result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode()); return result; } @@ -1197,6 +1227,12 @@ public class AddressSettings implements Mergeable, Serializable return false; } else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch)) return false; + + if (defaultConsumerWindowSize == null) { + if (other.defaultConsumerWindowSize != null) + return false; + } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize)) + return false; return true; } @@ -1280,6 +1316,8 @@ public class AddressSettings implements Mergeable, Serializable defaultConsumersBeforeDispatch + ", defaultDelayBeforeDispatch=" + defaultDelayBeforeDispatch + + ", defaultClientWindowSize=" + + defaultConsumerWindowSize + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index e96923d0f4..04e29316aa 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3011,6 +3011,14 @@ + + + + the default window size for a consumer + + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 8fcac204cd..114779d318 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -346,6 +346,7 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers()); assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType()); assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType()); + assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize()); assertTrue(conf.getResourceLimitSettings().containsKey("myUser")); assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections()); diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index eb561fcdfe..186a712cb8 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -337,6 +337,7 @@ 15 MULTICAST ANYCAST + 10000 diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml index 9e1ca40944..443958e168 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml @@ -62,5 +62,6 @@ 15 MULTICAST ANYCAST + 10000 \ No newline at end of file diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index f73adf24a1..ac5e49bf2a 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -790,3 +790,8 @@ types](#routing-type). address if the broker is unable to determine the routing-type based on the client and/or protocol semantics. Default is `MULTICAST`. Read more about [routing types](#routing-type). + +`default-consumer-window-size` defines the default `consumerWindowSize` value +for a `CORE` protocol consumer, if not defined the default will be set to +1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size +if the value is not set on the client. Read more about [flow control](#flow-control). diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md index 44fbee3f16..015107973d 100644 --- a/docs/user-manual/en/flow-control.md +++ b/docs/user-manual/en/flow-control.md @@ -33,7 +33,7 @@ buffered on each consumer is determined by the `consumerWindowSize` parameter. By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024 -bytes). +bytes) unless overridden via ([Address Settings](address-model.md#configuring-addresses-and-queues-via-address-settings)) The value can be: diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java index d3adee0fe9..d4298e5087 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java @@ -24,7 +24,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -32,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; @@ -1386,4 +1389,65 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase { } } + @Test + public void testDefaultConsumerWindowSize() throws Exception { + ActiveMQServer messagingService = createServer(false, isNetty()); + + messagingService.start(); + messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false); + + ClientSessionFactory cf = createSessionFactory(locator); + ClientSession session = cf.createSession(false, true, true); + ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA); + + consumer.start(); + + assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE / 2, consumer.getClientWindowSize()); + } + + @Test + public void testConsumerWindowSizeAddressSettings() throws Exception { + ActiveMQServer messagingService = createServer(false, isNetty()); + + final int defaultConsumerWindowSize = 1024 * 5; + final AddressSettings settings = new AddressSettings(); + settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize); + messagingService.getConfiguration() + .getAddressesSettings().put(queueA.toString(), settings); + + messagingService.start(); + messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false); + + ClientSessionFactory cf = createSessionFactory(locator); + ClientSession session = cf.createSession(false, true, true); + ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA); + + session.start(); + + assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize()); + } + + @Test + public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception { + ActiveMQServer messagingService = createServer(false, isNetty()); + + final int defaultConsumerWindowSize = 1024 * 5; + final AddressSettings settings = new AddressSettings(); + settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize); + messagingService.getConfiguration() + .getAddressesSettings().put("#", settings); + + messagingService.start(); + messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false); + + ClientSessionFactory cf = createSessionFactory(locator); + ClientSession session = cf.createSession(false, true, true); + ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA); + ClientConsumerImpl consumer2 = (ClientConsumerImpl) session.createConsumer(queueA); + + session.start(); + + assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize()); + assertEquals(defaultConsumerWindowSize / 2, consumer2.getClientWindowSize()); + } }