This closes #2191
This commit is contained in:
commit
5b07c635a2
|
@ -147,6 +147,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
|
||||||
Boolean isExclusive();
|
Boolean isExclusive();
|
||||||
|
|
||||||
Boolean isLastValue();
|
Boolean isLastValue();
|
||||||
|
|
||||||
|
Integer getDefaultConsumerWindowSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lifecycle operations ------------------------------------------
|
// Lifecycle operations ------------------------------------------
|
||||||
|
|
|
@ -16,9 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.client.impl;
|
package org.apache.activemq.artemis.core.client.impl;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
|
||||||
|
|
||||||
public class QueueQueryImpl implements ClientSession.QueueQuery {
|
public class QueueQueryImpl implements ClientSession.QueueQuery {
|
||||||
|
|
||||||
|
@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
|
||||||
|
|
||||||
private final Boolean lastValue;
|
private final Boolean lastValue;
|
||||||
|
|
||||||
|
private final Integer defaultConsumerWindowSize;
|
||||||
|
|
||||||
public QueueQueryImpl(final boolean durable,
|
public QueueQueryImpl(final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
final int consumerCount,
|
final int consumerCount,
|
||||||
|
@ -88,7 +90,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
|
||||||
final boolean autoCreated,
|
final boolean autoCreated,
|
||||||
final boolean purgeOnNoConsumers,
|
final boolean purgeOnNoConsumers,
|
||||||
final RoutingType routingType) {
|
final RoutingType routingType) {
|
||||||
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null);
|
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null, null);
|
||||||
}
|
}
|
||||||
public QueueQueryImpl(final boolean durable,
|
public QueueQueryImpl(final boolean durable,
|
||||||
final boolean temporary,
|
final boolean temporary,
|
||||||
|
@ -104,7 +106,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
|
||||||
final boolean purgeOnNoConsumers,
|
final boolean purgeOnNoConsumers,
|
||||||
final RoutingType routingType,
|
final RoutingType routingType,
|
||||||
final Boolean exclusive,
|
final Boolean exclusive,
|
||||||
final Boolean lastValue) {
|
final Boolean lastValue,
|
||||||
|
final Integer defaultConsumerWindowSize) {
|
||||||
this.durable = durable;
|
this.durable = durable;
|
||||||
this.temporary = temporary;
|
this.temporary = temporary;
|
||||||
this.consumerCount = consumerCount;
|
this.consumerCount = consumerCount;
|
||||||
|
@ -120,6 +123,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
|
||||||
this.routingType = routingType;
|
this.routingType = routingType;
|
||||||
this.exclusive = exclusive;
|
this.exclusive = exclusive;
|
||||||
this.lastValue = lastValue;
|
this.lastValue = lastValue;
|
||||||
|
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -197,5 +201,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
|
||||||
return lastValue;
|
return lastValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Integer getDefaultConsumerWindowSize() {
|
||||||
|
return defaultConsumerWindowSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.protocol.core.impl;
|
package org.apache.activemq.artemis.core.protocol.core.impl;
|
||||||
|
|
||||||
import javax.transaction.xa.XAException;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
|
||||||
import javax.transaction.xa.XAResource;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
|
||||||
import javax.transaction.xa.Xid;
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
|
||||||
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
|
||||||
|
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
|
||||||
|
|
||||||
import java.security.AccessController;
|
import java.security.AccessController;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
@ -29,6 +32,10 @@ import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.transaction.xa.XAException;
|
||||||
|
import javax.transaction.xa.XAResource;
|
||||||
|
import javax.transaction.xa.Xid;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
@ -37,6 +44,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
|
||||||
|
@ -85,6 +93,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
|
||||||
|
@ -115,12 +124,6 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
|
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
|
|
||||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
|
|
||||||
|
|
||||||
public class ActiveMQSessionContext extends SessionContext {
|
public class ActiveMQSessionContext extends SessionContext {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
|
private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
|
||||||
|
@ -324,8 +327,9 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
// The actual windows size that gets used is determined by the user since
|
// The actual windows size that gets used is determined by the user since
|
||||||
// could be overridden on the queue settings
|
// could be overridden on the queue settings
|
||||||
// The value we send is just a hint
|
// The value we send is just a hint
|
||||||
|
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
|
||||||
|
|
||||||
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
|
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -822,6 +826,16 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException {
|
||||||
|
if (response instanceof SessionQueueQueryResponseMessage_V3) {
|
||||||
|
final Integer defaultConsumerWindowSize = ((SessionQueueQueryResponseMessage_V3) response).getDefaultConsumerWindowSize();
|
||||||
|
return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
|
||||||
|
} else {
|
||||||
|
return ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Channel getCreateChannel() {
|
private Channel getCreateChannel() {
|
||||||
return getCoreConnection().getChannel(1, -1);
|
return getCoreConnection().getChannel(1, -1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,12 +38,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
|
|
||||||
protected Boolean lastValue;
|
protected Boolean lastValue;
|
||||||
|
|
||||||
|
protected Integer defaultConsumerWindowSize;
|
||||||
|
|
||||||
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
|
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
|
||||||
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue());
|
this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue(), result.getDefaultConsumerWindowSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
public SessionQueueQueryResponseMessage_V3() {
|
public SessionQueueQueryResponseMessage_V3() {
|
||||||
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null);
|
this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
|
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
|
||||||
|
@ -60,7 +62,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
final RoutingType routingType,
|
final RoutingType routingType,
|
||||||
final int maxConsumers,
|
final int maxConsumers,
|
||||||
final Boolean exclusive,
|
final Boolean exclusive,
|
||||||
final Boolean lastValue) {
|
final Boolean lastValue,
|
||||||
|
final Integer defaultConsumerWindowSize) {
|
||||||
super(SESS_QUEUEQUERY_RESP_V3);
|
super(SESS_QUEUEQUERY_RESP_V3);
|
||||||
|
|
||||||
this.durable = durable;
|
this.durable = durable;
|
||||||
|
@ -92,6 +95,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
this.exclusive = exclusive;
|
this.exclusive = exclusive;
|
||||||
|
|
||||||
this.lastValue = lastValue;
|
this.lastValue = lastValue;
|
||||||
|
|
||||||
|
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isAutoCreated() {
|
public boolean isAutoCreated() {
|
||||||
|
@ -142,6 +147,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
this.lastValue = lastValue;
|
this.lastValue = lastValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Integer getDefaultConsumerWindowSize() {
|
||||||
|
return defaultConsumerWindowSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDefaultConsumerWindowSize(Integer defaultConsumerWindowSize) {
|
||||||
|
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void encodeRest(final ActiveMQBuffer buffer) {
|
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||||
super.encodeRest(buffer);
|
super.encodeRest(buffer);
|
||||||
|
@ -151,6 +164,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
buffer.writeInt(maxConsumers);
|
buffer.writeInt(maxConsumers);
|
||||||
BufferHelper.writeNullableBoolean(buffer, exclusive);
|
BufferHelper.writeNullableBoolean(buffer, exclusive);
|
||||||
BufferHelper.writeNullableBoolean(buffer, lastValue);
|
BufferHelper.writeNullableBoolean(buffer, lastValue);
|
||||||
|
BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -164,6 +178,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
exclusive = BufferHelper.readNullableBoolean(buffer);
|
exclusive = BufferHelper.readNullableBoolean(buffer);
|
||||||
lastValue = BufferHelper.readNullableBoolean(buffer);
|
lastValue = BufferHelper.readNullableBoolean(buffer);
|
||||||
}
|
}
|
||||||
|
if (buffer.readableBytes() > 0) {
|
||||||
|
defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -176,6 +193,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
result = prime * result + maxConsumers;
|
result = prime * result + maxConsumers;
|
||||||
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
|
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
|
||||||
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
|
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
|
||||||
|
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,12 +213,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
buff.append(", maxConsumers=" + maxConsumers);
|
buff.append(", maxConsumers=" + maxConsumers);
|
||||||
buff.append(", exclusive=" + exclusive);
|
buff.append(", exclusive=" + exclusive);
|
||||||
buff.append(", lastValue=" + lastValue);
|
buff.append(", lastValue=" + lastValue);
|
||||||
|
buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
|
||||||
return buff.toString();
|
return buff.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientSession.QueueQuery toQueueQuery() {
|
public ClientSession.QueueQuery toQueueQuery() {
|
||||||
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue());
|
return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue(), getDefaultConsumerWindowSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -226,6 +245,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
|
||||||
return false;
|
return false;
|
||||||
} else if (!lastValue.equals(other.lastValue))
|
} else if (!lastValue.equals(other.lastValue))
|
||||||
return false;
|
return false;
|
||||||
|
if (defaultConsumerWindowSize == null) {
|
||||||
|
if (other.defaultConsumerWindowSize != null)
|
||||||
|
return false;
|
||||||
|
} else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
|
||||||
|
return false;
|
||||||
if (routingType == null) {
|
if (routingType == null) {
|
||||||
if (other.routingType != null)
|
if (other.routingType != null)
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -51,6 +51,8 @@ public class QueueQueryResult {
|
||||||
|
|
||||||
private Boolean lastValue;
|
private Boolean lastValue;
|
||||||
|
|
||||||
|
private Integer defaultConsumerWindowSize;
|
||||||
|
|
||||||
public QueueQueryResult(final SimpleString name,
|
public QueueQueryResult(final SimpleString name,
|
||||||
final SimpleString address,
|
final SimpleString address,
|
||||||
final boolean durable,
|
final boolean durable,
|
||||||
|
@ -65,7 +67,8 @@ public class QueueQueryResult {
|
||||||
final RoutingType routingType,
|
final RoutingType routingType,
|
||||||
final int maxConsumers,
|
final int maxConsumers,
|
||||||
final Boolean exclusive,
|
final Boolean exclusive,
|
||||||
final Boolean lastValue) {
|
final Boolean lastValue,
|
||||||
|
final Integer defaultConsumerWindowSize) {
|
||||||
this.durable = durable;
|
this.durable = durable;
|
||||||
|
|
||||||
this.temporary = temporary;
|
this.temporary = temporary;
|
||||||
|
@ -95,6 +98,8 @@ public class QueueQueryResult {
|
||||||
this.exclusive = exclusive;
|
this.exclusive = exclusive;
|
||||||
|
|
||||||
this.lastValue = lastValue;
|
this.lastValue = lastValue;
|
||||||
|
|
||||||
|
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isExists() {
|
public boolean isExists() {
|
||||||
|
@ -160,4 +165,8 @@ public class QueueQueryResult {
|
||||||
public Boolean isLastValue() {
|
public Boolean isLastValue() {
|
||||||
return lastValue;
|
return lastValue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Integer getDefaultConsumerWindowSize() {
|
||||||
|
return defaultConsumerWindowSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
|
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.utils.IDGenerator;
|
import org.apache.activemq.artemis.utils.IDGenerator;
|
||||||
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
|
||||||
|
@ -325,6 +326,8 @@ public abstract class SessionContext {
|
||||||
|
|
||||||
public abstract void resetMetadata(HashMap<String, String> metaDataToSend);
|
public abstract void resetMetadata(HashMap<String, String> metaDataToSend);
|
||||||
|
|
||||||
|
public abstract int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException;
|
||||||
|
|
||||||
// Failover utility classes
|
// Failover utility classes
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -241,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
|
|
||||||
private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
|
private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
|
||||||
|
|
||||||
|
private static final String DEFAULT_CONSUMER_WINDOW_SIZE = "default-consumer-window-size";
|
||||||
|
|
||||||
|
|
||||||
// Attributes ----------------------------------------------------
|
// Attributes ----------------------------------------------------
|
||||||
|
|
||||||
|
@ -1068,6 +1070,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
||||||
Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value);
|
Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value);
|
||||||
RoutingType routingType = RoutingType.valueOf(value);
|
RoutingType routingType = RoutingType.valueOf(value);
|
||||||
addressSettings.setDefaultAddressRoutingType(routingType);
|
addressSettings.setDefaultAddressRoutingType(routingType);
|
||||||
|
} else if (DEFAULT_CONSUMER_WINDOW_SIZE.equalsIgnoreCase(name)) {
|
||||||
|
addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return setting;
|
return setting;
|
||||||
|
|
|
@ -358,4 +358,6 @@ public interface ServerSession extends SecurityAuth {
|
||||||
int getConsumerCount();
|
int getConsumerCount();
|
||||||
|
|
||||||
int getProducerCount();
|
int getProducerCount();
|
||||||
|
|
||||||
|
int getDefaultConsumerWindowSize(SimpleString address);
|
||||||
}
|
}
|
|
@ -865,11 +865,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
|
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
|
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
|
||||||
boolean defaultPurgeOnNoConsumers = getAddressSettingsRepository().getMatch(name.toString()).isDefaultPurgeOnNoConsumers();
|
|
||||||
int defaultMaxConsumers = getAddressSettingsRepository().getMatch(name.toString()).getDefaultMaxConsumers();
|
boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
|
||||||
boolean defaultExclusiveQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultExclusiveQueue();
|
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
|
||||||
boolean defaultLastValueQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultLastValueQueue();
|
int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
|
||||||
|
boolean defaultExclusiveQueue = addressSettings.isDefaultExclusiveQueue();
|
||||||
|
boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
|
||||||
|
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
|
||||||
|
|
||||||
QueueQueryResult response;
|
QueueQueryResult response;
|
||||||
|
|
||||||
|
@ -884,14 +887,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
SimpleString filterString = filter == null ? null : filter.getFilterString();
|
SimpleString filterString = filter == null ? null : filter.getFilterString();
|
||||||
|
|
||||||
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue());
|
response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), defaultConsumerWindowSize);
|
||||||
} else if (name.equals(managementAddress)) {
|
} else if (name.equals(managementAddress)) {
|
||||||
// make an exception for the management address (see HORNETQ-29)
|
// make an exception for the management address (see HORNETQ-29)
|
||||||
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false);
|
response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, defaultConsumerWindowSize);
|
||||||
} else if (autoCreateQueues) {
|
} else if (autoCreateQueues) {
|
||||||
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue);
|
response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultConsumerWindowSize);
|
||||||
} else {
|
} else {
|
||||||
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null);
|
response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null, defaultConsumerWindowSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
|
|
|
@ -1904,4 +1904,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
||||||
public int getProducerCount() {
|
public int getProducerCount() {
|
||||||
return getServerProducers().size();
|
return getServerProducers().size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getDefaultConsumerWindowSize(SimpleString address) {
|
||||||
|
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
|
||||||
|
return as.getDefaultConsumerWindowSize();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -21,6 +21,7 @@ import java.io.Serializable;
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.core.settings.Mergeable;
|
import org.apache.activemq.artemis.core.settings.Mergeable;
|
||||||
|
@ -178,6 +179,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
private RoutingType defaultAddressRoutingType = null;
|
private RoutingType defaultAddressRoutingType = null;
|
||||||
|
|
||||||
|
private Integer defaultConsumerWindowSize = null;
|
||||||
|
|
||||||
//from amq5
|
//from amq5
|
||||||
//make it transient
|
//make it transient
|
||||||
private transient Integer queuePrefetch = null;
|
private transient Integer queuePrefetch = null;
|
||||||
|
@ -222,6 +225,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
|
this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
|
||||||
this.defaultQueueRoutingType = other.defaultQueueRoutingType;
|
this.defaultQueueRoutingType = other.defaultQueueRoutingType;
|
||||||
this.defaultAddressRoutingType = other.defaultAddressRoutingType;
|
this.defaultAddressRoutingType = other.defaultAddressRoutingType;
|
||||||
|
this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AddressSettings() {
|
public AddressSettings() {
|
||||||
|
@ -579,6 +583,21 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the defaultConsumerWindowSize
|
||||||
|
*/
|
||||||
|
public int getDefaultConsumerWindowSize() {
|
||||||
|
return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param defaultConsumerWindowSize the defaultConsumerWindowSize to set
|
||||||
|
*/
|
||||||
|
public AddressSettings setDefaultConsumerWindowSize(int defaultConsumerWindowSize) {
|
||||||
|
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* merge 2 objects in to 1
|
* merge 2 objects in to 1
|
||||||
*
|
*
|
||||||
|
@ -694,6 +713,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
if (defaultExclusiveQueue == null) {
|
if (defaultExclusiveQueue == null) {
|
||||||
defaultExclusiveQueue = merged.defaultExclusiveQueue;
|
defaultExclusiveQueue = merged.defaultExclusiveQueue;
|
||||||
}
|
}
|
||||||
|
if (defaultConsumerWindowSize == null) {
|
||||||
|
defaultConsumerWindowSize = merged.defaultConsumerWindowSize;
|
||||||
|
}
|
||||||
if (defaultLastValueQueue == null) {
|
if (defaultLastValueQueue == null) {
|
||||||
defaultLastValueQueue = merged.defaultLastValueQueue;
|
defaultLastValueQueue = merged.defaultLastValueQueue;
|
||||||
}
|
}
|
||||||
|
@ -811,6 +833,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
if (buffer.readableBytes() > 0) {
|
if (buffer.readableBytes() > 0) {
|
||||||
defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
|
defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (buffer.readableBytes() > 0) {
|
||||||
|
defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -851,7 +877,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
DataConstants.SIZE_BYTE +
|
DataConstants.SIZE_BYTE +
|
||||||
BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
|
BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
|
||||||
BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
|
BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
|
||||||
BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch);
|
BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch) +
|
||||||
|
BufferHelper.sizeOfNullableInteger(defaultConsumerWindowSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -932,6 +959,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
|
|
||||||
BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
|
BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
|
||||||
|
|
||||||
|
BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
|
@ -980,6 +1009,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
|
result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
|
||||||
result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
|
result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
|
||||||
result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
|
result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
|
||||||
|
result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1197,6 +1227,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
return false;
|
return false;
|
||||||
} else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
|
} else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
if (defaultConsumerWindowSize == null) {
|
||||||
|
if (other.defaultConsumerWindowSize != null)
|
||||||
|
return false;
|
||||||
|
} else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
|
||||||
|
return false;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1280,6 +1316,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
||||||
defaultConsumersBeforeDispatch +
|
defaultConsumersBeforeDispatch +
|
||||||
", defaultDelayBeforeDispatch=" +
|
", defaultDelayBeforeDispatch=" +
|
||||||
defaultDelayBeforeDispatch +
|
defaultDelayBeforeDispatch +
|
||||||
|
", defaultClientWindowSize=" +
|
||||||
|
defaultConsumerWindowSize +
|
||||||
"]";
|
"]";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3011,6 +3011,14 @@
|
||||||
</xsd:annotation>
|
</xsd:annotation>
|
||||||
</xsd:element>
|
</xsd:element>
|
||||||
|
|
||||||
|
<xsd:element name="default-consumer-window-size" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
|
||||||
|
<xsd:annotation>
|
||||||
|
<xsd:documentation>
|
||||||
|
the default window size for a consumer
|
||||||
|
</xsd:documentation>
|
||||||
|
</xsd:annotation>
|
||||||
|
</xsd:element>
|
||||||
|
|
||||||
</xsd:all>
|
</xsd:all>
|
||||||
|
|
||||||
<xsd:attribute name="match" type="xsd:string" use="required">
|
<xsd:attribute name="match" type="xsd:string" use="required">
|
||||||
|
|
|
@ -346,6 +346,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
||||||
assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers());
|
assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers());
|
||||||
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
|
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
|
||||||
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
|
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
|
||||||
|
assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
|
||||||
|
|
||||||
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
|
||||||
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
|
||||||
|
|
|
@ -337,6 +337,7 @@
|
||||||
<default-max-consumers>15</default-max-consumers>
|
<default-max-consumers>15</default-max-consumers>
|
||||||
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
||||||
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
||||||
|
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
||||||
<resource-limit-settings>
|
<resource-limit-settings>
|
||||||
|
|
|
@ -62,5 +62,6 @@
|
||||||
<default-max-consumers>15</default-max-consumers>
|
<default-max-consumers>15</default-max-consumers>
|
||||||
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
|
||||||
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
<default-address-routing-type>ANYCAST</default-address-routing-type>
|
||||||
|
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||||
</address-setting>
|
</address-setting>
|
||||||
</address-settings>
|
</address-settings>
|
|
@ -790,3 +790,8 @@ types](#routing-type).
|
||||||
address if the broker is unable to determine the routing-type based on the
|
address if the broker is unable to determine the routing-type based on the
|
||||||
client and/or protocol semantics. Default is `MULTICAST`. Read more about
|
client and/or protocol semantics. Default is `MULTICAST`. Read more about
|
||||||
[routing types](#routing-type).
|
[routing types](#routing-type).
|
||||||
|
|
||||||
|
`default-consumer-window-size` defines the default `consumerWindowSize` value
|
||||||
|
for a `CORE` protocol consumer, if not defined the default will be set to
|
||||||
|
1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size
|
||||||
|
if the value is not set on the client. Read more about [flow control](#flow-control).
|
||||||
|
|
|
@ -33,7 +33,7 @@ buffered on each consumer is determined by the `consumerWindowSize`
|
||||||
parameter.
|
parameter.
|
||||||
|
|
||||||
By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024
|
By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024
|
||||||
bytes).
|
bytes) unless overridden via ([Address Settings](address-model.md#configuring-addresses-and-queues-via-address-settings))
|
||||||
|
|
||||||
The value can be:
|
The value can be:
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,9 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||||
|
@ -32,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
|
@ -1386,4 +1389,65 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDefaultConsumerWindowSize() throws Exception {
|
||||||
|
ActiveMQServer messagingService = createServer(false, isNetty());
|
||||||
|
|
||||||
|
messagingService.start();
|
||||||
|
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
|
||||||
|
|
||||||
|
ClientSessionFactory cf = createSessionFactory(locator);
|
||||||
|
ClientSession session = cf.createSession(false, true, true);
|
||||||
|
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
|
||||||
|
|
||||||
|
consumer.start();
|
||||||
|
|
||||||
|
assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE / 2, consumer.getClientWindowSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsumerWindowSizeAddressSettings() throws Exception {
|
||||||
|
ActiveMQServer messagingService = createServer(false, isNetty());
|
||||||
|
|
||||||
|
final int defaultConsumerWindowSize = 1024 * 5;
|
||||||
|
final AddressSettings settings = new AddressSettings();
|
||||||
|
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
|
||||||
|
messagingService.getConfiguration()
|
||||||
|
.getAddressesSettings().put(queueA.toString(), settings);
|
||||||
|
|
||||||
|
messagingService.start();
|
||||||
|
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
|
||||||
|
|
||||||
|
ClientSessionFactory cf = createSessionFactory(locator);
|
||||||
|
ClientSession session = cf.createSession(false, true, true);
|
||||||
|
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
|
||||||
|
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
|
||||||
|
ActiveMQServer messagingService = createServer(false, isNetty());
|
||||||
|
|
||||||
|
final int defaultConsumerWindowSize = 1024 * 5;
|
||||||
|
final AddressSettings settings = new AddressSettings();
|
||||||
|
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
|
||||||
|
messagingService.getConfiguration()
|
||||||
|
.getAddressesSettings().put("#", settings);
|
||||||
|
|
||||||
|
messagingService.start();
|
||||||
|
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
|
||||||
|
|
||||||
|
ClientSessionFactory cf = createSessionFactory(locator);
|
||||||
|
ClientSession session = cf.createSession(false, true, true);
|
||||||
|
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
|
||||||
|
ClientConsumerImpl consumer2 = (ClientConsumerImpl) session.createConsumer(queueA);
|
||||||
|
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
|
||||||
|
assertEquals(defaultConsumerWindowSize / 2, consumer2.getClientWindowSize());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue