ARTEMIS-1987 - Add consumer window size to AddressSettings

Support configuring a default consumer window size via AddressSettings
which will allow sensible defaults to be used by address type
This commit is contained in:
Christopher L. Shannon (cshannon) 2018-07-25 06:42:29 -04:00 committed by Clebert Suconic
parent 587e45594c
commit 5fc60d7437
18 changed files with 222 additions and 29 deletions

View File

@ -147,6 +147,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Boolean isExclusive();
Boolean isLastValue();
Integer getDefaultConsumerWindowSize();
}
// Lifecycle operations ------------------------------------------

View File

@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.RoutingType;
public class QueueQueryImpl implements ClientSession.QueueQuery {
@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Boolean lastValue;
private final Integer defaultConsumerWindowSize;
public QueueQueryImpl(final boolean durable,
final boolean temporary,
final int consumerCount,
@ -88,7 +90,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final boolean autoCreated,
final boolean purgeOnNoConsumers,
final RoutingType routingType) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null);
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null, null);
}
public QueueQueryImpl(final boolean durable,
final boolean temporary,
@ -104,7 +106,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final boolean purgeOnNoConsumers,
final RoutingType routingType,
final Boolean exclusive,
final Boolean lastValue) {
final Boolean lastValue,
final Integer defaultConsumerWindowSize) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@ -120,6 +123,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.routingType = routingType;
this.exclusive = exclusive;
this.lastValue = lastValue;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@Override
@ -197,5 +201,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
return lastValue;
}
@Override
public Integer getDefaultConsumerWindowSize() {
return defaultConsumerWindowSize;
}
}

View File

@ -16,9 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.EnumSet;
@ -29,6 +32,10 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -37,6 +44,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
@ -85,6 +93,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@ -115,12 +124,6 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
public class ActiveMQSessionContext extends SessionContext {
private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
@ -324,8 +327,9 @@ public class ActiveMQSessionContext extends SessionContext {
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
// The value we send is just a hint
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
@Override
@ -822,6 +826,16 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
@Override
public int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException {
if (response instanceof SessionQueueQueryResponseMessage_V3) {
final Integer defaultConsumerWindowSize = ((SessionQueueQueryResponseMessage_V3) response).getDefaultConsumerWindowSize();
return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
} else {
return ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
}
}
private Channel getCreateChannel() {
return getCoreConnection().getChannel(1, -1);
}

View File

@ -38,12 +38,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
protected Boolean lastValue;
protected Integer defaultConsumerWindowSize;
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue());
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue(), result.getDefaultConsumerWindowSize());
}
public SessionQueueQueryResponseMessage_V3() {
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null);
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@ -60,7 +62,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final RoutingType routingType,
final int maxConsumers,
final Boolean exclusive,
final Boolean lastValue) {
final Boolean lastValue,
final Integer defaultConsumerWindowSize) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
@ -92,6 +95,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.exclusive = exclusive;
this.lastValue = lastValue;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
public boolean isAutoCreated() {
@ -142,6 +147,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.lastValue = lastValue;
}
public Integer getDefaultConsumerWindowSize() {
return defaultConsumerWindowSize;
}
public void setDefaultConsumerWindowSize(Integer defaultConsumerWindowSize) {
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@ -151,6 +164,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buffer.writeInt(maxConsumers);
BufferHelper.writeNullableBoolean(buffer, exclusive);
BufferHelper.writeNullableBoolean(buffer, lastValue);
BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
}
@Override
@ -164,6 +178,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
exclusive = BufferHelper.readNullableBoolean(buffer);
lastValue = BufferHelper.readNullableBoolean(buffer);
}
if (buffer.readableBytes() > 0) {
defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
}
}
@Override
@ -176,6 +193,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + maxConsumers;
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
return result;
}
@ -195,12 +213,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", maxConsumers=" + maxConsumers);
buff.append(", exclusive=" + exclusive);
buff.append(", lastValue=" + lastValue);
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue());
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue(), getDefaultConsumerWindowSize());
}
@Override
@ -226,6 +245,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
} else if (!lastValue.equals(other.lastValue))
return false;
if (defaultConsumerWindowSize == null) {
if (other.defaultConsumerWindowSize != null)
return false;
} else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
return false;
if (routingType == null) {
if (other.routingType != null)
return false;

View File

@ -51,6 +51,8 @@ public class QueueQueryResult {
private Boolean lastValue;
private Integer defaultConsumerWindowSize;
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@ -65,7 +67,8 @@ public class QueueQueryResult {
final RoutingType routingType,
final int maxConsumers,
final Boolean exclusive,
final Boolean lastValue) {
final Boolean lastValue,
final Integer defaultConsumerWindowSize) {
this.durable = durable;
this.temporary = temporary;
@ -95,6 +98,8 @@ public class QueueQueryResult {
this.exclusive = exclusive;
this.lastValue = lastValue;
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
public boolean isExists() {
@ -160,4 +165,8 @@ public class QueueQueryResult {
public Boolean isLastValue() {
return lastValue;
}
public Integer getDefaultConsumerWindowSize() {
return defaultConsumerWindowSize;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@ -325,6 +326,8 @@ public abstract class SessionContext {
public abstract void resetMetadata(HashMap<String, String> metaDataToSend);
public abstract int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException;
// Failover utility classes
/**

View File

@ -241,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
private static final String DEFAULT_CONSUMER_WINDOW_SIZE = "default-consumer-window-size";
// Attributes ----------------------------------------------------
@ -1068,6 +1070,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value);
RoutingType routingType = RoutingType.valueOf(value);
addressSettings.setDefaultAddressRoutingType(routingType);
} else if (DEFAULT_CONSUMER_WINDOW_SIZE.equalsIgnoreCase(name)) {
addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
}
}
return setting;

View File

@ -358,4 +358,6 @@ public interface ServerSession extends SecurityAuth {
int getConsumerCount();
int getProducerCount();
int getDefaultConsumerWindowSize(SimpleString address);
}

View File

@ -865,11 +865,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
boolean defaultPurgeOnNoConsumers = getAddressSettingsRepository().getMatch(name.toString()).isDefaultPurgeOnNoConsumers();
int defaultMaxConsumers = getAddressSettingsRepository().getMatch(name.toString()).getDefaultMaxConsumers();
boolean defaultExclusiveQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultExclusiveQueue();
boolean defaultLastValueQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultLastValueQueue();
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
boolean defaultExclusiveQueue = addressSettings.isDefaultExclusiveQueue();
boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
QueueQueryResult response;
@ -884,14 +887,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue());
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), defaultConsumerWindowSize);
} else if (name.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false);
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, defaultConsumerWindowSize);
} else if (autoCreateQueues) {
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue);
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultConsumerWindowSize);
} else {
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null);
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null, defaultConsumerWindowSize);
}
return response;

View File

@ -1904,4 +1904,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public int getProducerCount() {
return getServerProducers().size();
}
@Override
public int getDefaultConsumerWindowSize(SimpleString address) {
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
return as.getDefaultConsumerWindowSize();
}
}

View File

@ -21,6 +21,7 @@ import java.io.Serializable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.Mergeable;
@ -178,6 +179,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private RoutingType defaultAddressRoutingType = null;
private Integer defaultConsumerWindowSize = null;
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@ -222,6 +225,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
this.defaultQueueRoutingType = other.defaultQueueRoutingType;
this.defaultAddressRoutingType = other.defaultAddressRoutingType;
this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
}
public AddressSettings() {
@ -579,6 +583,21 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
/**
* @return the defaultConsumerWindowSize
*/
public int getDefaultConsumerWindowSize() {
return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
}
/**
* @param defaultConsumerWindowSize the defaultConsumerWindowSize to set
*/
public AddressSettings setDefaultConsumerWindowSize(int defaultConsumerWindowSize) {
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
return this;
}
/**
* merge 2 objects in to 1
*
@ -694,6 +713,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (defaultExclusiveQueue == null) {
defaultExclusiveQueue = merged.defaultExclusiveQueue;
}
if (defaultConsumerWindowSize == null) {
defaultConsumerWindowSize = merged.defaultConsumerWindowSize;
}
if (defaultLastValueQueue == null) {
defaultLastValueQueue = merged.defaultLastValueQueue;
}
@ -811,6 +833,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
}
if (buffer.readableBytes() > 0) {
defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
}
}
@Override
@ -851,7 +877,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
DataConstants.SIZE_BYTE +
BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch);
BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch) +
BufferHelper.sizeOfNullableInteger(defaultConsumerWindowSize);
}
@Override
@ -932,6 +959,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
}
/* (non-Javadoc)
@ -980,6 +1009,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
return result;
}
@ -1197,6 +1227,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
return false;
if (defaultConsumerWindowSize == null) {
if (other.defaultConsumerWindowSize != null)
return false;
} else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
return false;
return true;
}
@ -1280,6 +1316,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultConsumersBeforeDispatch +
", defaultDelayBeforeDispatch=" +
defaultDelayBeforeDispatch +
", defaultClientWindowSize=" +
defaultConsumerWindowSize +
"]";
}
}

View File

@ -3011,6 +3011,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="default-consumer-window-size" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the default window size for a consumer
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required">

View File

@ -346,6 +346,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers());
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());

View File

@ -337,6 +337,7 @@
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-consumer-window-size>10000</default-consumer-window-size>
</address-setting>
</address-settings>
<resource-limit-settings>

View File

@ -62,5 +62,6 @@
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
<default-address-routing-type>ANYCAST</default-address-routing-type>
<default-consumer-window-size>10000</default-consumer-window-size>
</address-setting>
</address-settings>

View File

@ -790,3 +790,8 @@ types](#routing-type).
address if the broker is unable to determine the routing-type based on the
client and/or protocol semantics. Default is `MULTICAST`. Read more about
[routing types](#routing-type).
`default-consumer-window-size` defines the default `consumerWindowSize` value
for a `CORE` protocol consumer, if not defined the default will be set to
1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size
if the value is not set on the client. Read more about [flow control](#flow-control).

View File

@ -33,7 +33,7 @@ buffered on each consumer is determined by the `consumerWindowSize`
parameter.
By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024
bytes).
bytes) unless overridden via ([Address Settings](address-model.md#configuring-addresses-and-queues-via-address-settings))
The value can be:

View File

@ -24,7 +24,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@ -32,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -1386,4 +1389,65 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
}
}
@Test
public void testDefaultConsumerWindowSize() throws Exception {
ActiveMQServer messagingService = createServer(false, isNetty());
messagingService.start();
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession(false, true, true);
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
consumer.start();
assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE / 2, consumer.getClientWindowSize());
}
@Test
public void testConsumerWindowSizeAddressSettings() throws Exception {
ActiveMQServer messagingService = createServer(false, isNetty());
final int defaultConsumerWindowSize = 1024 * 5;
final AddressSettings settings = new AddressSettings();
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
messagingService.getConfiguration()
.getAddressesSettings().put(queueA.toString(), settings);
messagingService.start();
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession(false, true, true);
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
session.start();
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
}
@Test
public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
ActiveMQServer messagingService = createServer(false, isNetty());
final int defaultConsumerWindowSize = 1024 * 5;
final AddressSettings settings = new AddressSettings();
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
messagingService.getConfiguration()
.getAddressesSettings().put("#", settings);
messagingService.start();
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession(false, true, true);
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
ClientConsumerImpl consumer2 = (ClientConsumerImpl) session.createConsumer(queueA);
session.start();
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
assertEquals(defaultConsumerWindowSize / 2, consumer2.getClientWindowSize());
}
}