ARTEMIS-589 flow control for individual STOMP subscribers

This commit is contained in:
Justin Bertram 2020-09-11 10:43:34 -05:00 committed by Clebert Suconic
parent ae5535b11f
commit e47eb5ae20
16 changed files with 357 additions and 66 deletions

View File

@ -165,9 +165,12 @@ public class TransportConstants {
public static final String CLUSTER_CONNECTION = "clusterConnection"; public static final String CLUSTER_CONNECTION = "clusterConnection";
@Deprecated
public static final String STOMP_CONSUMERS_CREDIT = "stompConsumerCredits"; public static final String STOMP_CONSUMERS_CREDIT = "stompConsumerCredits";
public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K public static final String STOMP_CONSUMER_WINDOW_SIZE = "stompConsumerWindowSize";
public static final int STOMP_DEFAULT_CONSUMER_WINDOW_SIZE = 10 * 1024; // 10K
public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled"; public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled";
@ -396,6 +399,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER); allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);

View File

@ -157,6 +157,13 @@ public interface Stomp {
*/ */
String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = "activemq.subscriptionName"; String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = "activemq.subscriptionName";
/**
* Backwards compatibility for STOMP clients that were using 5.x
*/
String ACTIVEMQ_PREFETCH_SIZE = "activemq.prefetchSize";
String CONSUMER_WINDOW_SIZE = "consumer-window-size";
String SUBSCRIPTION_TYPE = "subscription-type"; String SUBSCRIPTION_TYPE = "subscription-type";
String NO_LOCAL = "no-local"; String NO_LOCAL = "no-local";

View File

@ -728,7 +728,8 @@ public final class StompConnection implements RemotingConnection {
String id, String id,
String durableSubscriptionName, String durableSubscriptionName,
boolean noLocal, boolean noLocal,
RoutingType subscriptionType) throws ActiveMQStompException { RoutingType subscriptionType,
Integer consumerWindowSize) throws ActiveMQStompException {
autoCreateDestinationIfPossible(destination, subscriptionType); autoCreateDestinationIfPossible(destination, subscriptionType);
checkDestination(destination); checkDestination(destination);
checkRoutingSemantics(destination, subscriptionType); checkRoutingSemantics(destination, subscriptionType);
@ -756,7 +757,7 @@ public final class StompConnection implements RemotingConnection {
} }
try { try {
return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal); return manager.subscribe(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal, consumerWindowSize);
} catch (ActiveMQStompException e) { } catch (ActiveMQStompException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {

View File

@ -77,12 +77,19 @@ public class StompFrame {
@Override @Override
public String toString() { public String toString() {
return new StringBuilder() StringBuilder result = new StringBuilder()
.append("StompFrame[command=").append(command) .append("StompFrame[command=").append(command)
.append(", headers=").append(headers) .append(", headers=").append(headers);
.append(", content= ").append(this.body)
.append(", bytes= ").append(Arrays.toString(bytesBody)) if (command.equals(Stomp.Responses.MESSAGE) || command.equals(Stomp.Responses.ERROR) || command.equals(Stomp.Commands.SEND)) {
.toString(); result.append(", body=").append(this.getBody())
.append(", body-bytes=").append(Arrays.toString(bytesBody))
.append(", size=").append(size);
}
result.append("]");
return result.toString();
} }
public boolean isPing() { public boolean isPing() {

View File

@ -346,7 +346,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
String destination, String destination,
String selector, String selector,
String ack, String ack,
boolean noLocal) throws Exception { boolean noLocal,
Integer consumerWindowSize) throws Exception {
StompSession stompSession = getSession(connection); StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal); stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) { if (stompSession.containsSubscription(subscriptionID)) {
@ -354,7 +355,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
". Either use unique subscription IDs or do not create multiple subscriptions for the same destination"); ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
} }
long consumerID = server.getStorageManager().generateID(); long consumerID = server.getStorageManager().generateID();
return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack); return stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(), durableSubscriptionName, destination, selector, ack, consumerWindowSize);
} }
public void unsubscribe(StompConnection connection, public void unsubscribe(StompConnection connection,

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.stomp; package org.apache.activemq.artemis.core.protocol.stomp;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
@ -71,13 +72,10 @@ public class StompSession implements SessionCallback {
private volatile boolean noLocal = false; private volatile boolean noLocal = false;
private final int consumerCredits;
StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) { StompSession(final StompConnection connection, final StompProtocolManager manager, OperationContext sessionContext) {
this.connection = connection; this.connection = connection;
this.manager = manager; this.manager = manager;
this.sessionContext = sessionContext; this.sessionContext = sessionContext;
this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration());
} }
@Override @Override
@ -216,14 +214,13 @@ public class StompSession implements SessionCallback {
public void acknowledge(String messageID, String subscriptionID) throws Exception { public void acknowledge(String messageID, String subscriptionID) throws Exception {
long id = Long.parseLong(messageID); long id = Long.parseLong(messageID);
Pair<Long, Integer> pair = messagesToAck.remove(id); Pair<Long, Integer> pair = messagesToAck.get(id);
if (pair == null) { if (pair == null) {
throw BUNDLE.failToAckMissingID(id).setHandler(connection.getFrameHandler()); throw BUNDLE.failToAckMissingID(id).setHandler(connection.getFrameHandler());
} }
long consumerID = pair.getA(); long consumerID = pair.getA();
int credits = pair.getB();
StompSubscription sub = subscriptions.get(consumerID); StompSubscription sub = subscriptions.get(consumerID);
@ -233,14 +230,20 @@ public class StompSession implements SessionCallback {
} }
} }
if (this.consumerCredits != -1) {
session.receiveConsumerCredits(consumerID, credits);
}
if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)) { if (sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)) {
session.individualAcknowledge(consumerID, id); session.individualAcknowledge(consumerID, id);
if (sub.getConsumerWindowSize() != -1) {
session.receiveConsumerCredits(consumerID, messagesToAck.remove(id).getB());
}
} else { } else {
session.acknowledge(consumerID, id); List<Long> ackedRefs = session.acknowledge(consumerID, id);
if (sub.getConsumerWindowSize() != -1) {
for (Long ackedID : ackedRefs) {
session.receiveConsumerCredits(consumerID, messagesToAck.remove(ackedID).getB());
}
}
} }
session.commit(); session.commit();
@ -252,11 +255,20 @@ public class StompSession implements SessionCallback {
String durableSubscriptionName, String durableSubscriptionName,
String destination, String destination,
String selector, String selector,
String ack) throws Exception { String ack,
Integer consumerWindowSize) throws Exception {
SimpleString address = SimpleString.toSimpleString(destination); SimpleString address = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination); SimpleString queueName = SimpleString.toSimpleString(destination);
SimpleString selectorSimple = SimpleString.toSimpleString(selector); SimpleString selectorSimple = SimpleString.toSimpleString(selector);
final int receiveCredits = ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits; final int finalConsumerWindowSize;
if (consumerWindowSize != null) {
finalConsumerWindowSize = consumerWindowSize;
} else if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
finalConsumerWindowSize = -1;
} else {
finalConsumerWindowSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE, ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMER_WINDOW_SIZE, connection.getAcceptorUsed().getConfiguration()), connection.getAcceptorUsed().getConfiguration());
}
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes(); Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST); boolean multicast = routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST);
@ -281,10 +293,14 @@ public class StompSession implements SessionCallback {
} }
} }
final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0); final ServerConsumer consumer = session.createConsumer(consumerID, queueName, multicast ? null : selectorSimple, false, false, 0);
StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast); StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName, multicast, finalConsumerWindowSize);
subscriptions.put(consumerID, subscription); subscriptions.put(consumerID, subscription);
session.start(); session.start();
return () -> consumer.receiveCredits(receiveCredits); /*
* If the consumerWindowSize is 0 then we need to supply at least 1 credit otherwise messages will *never* flow.
* See org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl#startSlowConsumer()
*/
return () -> consumer.receiveCredits(finalConsumerWindowSize == 0 ? 1 : finalConsumerWindowSize);
} }
public boolean unsubscribe(String id, String durableSubscriptionName, String clientID) throws Exception { public boolean unsubscribe(String id, String durableSubscriptionName, String clientID) throws Exception {

View File

@ -19,9 +19,6 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
public class StompSubscription { public class StompSubscription {
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
private final String subID; private final String subID;
@ -32,19 +29,16 @@ public class StompSubscription {
// whether or not this subscription follows multicast semantics (e.g. for a JMS topic) // whether or not this subscription follows multicast semantics (e.g. for a JMS topic)
private final boolean multicast; private final boolean multicast;
// Static -------------------------------------------------------- private final int consumerWindowSize;
// Constructors -------------------------------------------------- public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast, int consumerWindowSize) {
public StompSubscription(String subID, String ack, SimpleString queueName, boolean multicast) {
this.subID = subID; this.subID = subID;
this.ack = ack; this.ack = ack;
this.queueName = queueName; this.queueName = queueName;
this.multicast = multicast; this.multicast = multicast;
this.consumerWindowSize = consumerWindowSize;
} }
// Public --------------------------------------------------------
public String getAck() { public String getAck() {
return ack; return ack;
} }
@ -61,9 +55,13 @@ public class StompSubscription {
return multicast; return multicast;
} }
public int getConsumerWindowSize() {
return consumerWindowSize;
}
@Override @Override
public String toString() { public String toString() {
return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + "]"; return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName + ", multicast=" + multicast + ", consumerWindowSize=" + consumerWindowSize + "]";
} }
} }

View File

@ -274,7 +274,13 @@ public abstract class VersionedStompFrameHandler {
} else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) { } else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
} }
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); Integer consumerWindowSize = null;
if (frame.hasHeader(Headers.Subscribe.CONSUMER_WINDOW_SIZE)) {
consumerWindowSize = Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE));
} else if (frame.hasHeader(Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE)) {
consumerWindowSize = Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE));
}
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType, consumerWindowSize);
} }
public String getDestination(StompFrame request) throws Exception { public String getDestination(StompFrame request) throws Exception {

View File

@ -94,7 +94,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
Object protocolDataStart, Object protocolDataStart,
Object protocolDataEnd); Object protocolDataEnd);
void acknowledge(Transaction tx, long messageID) throws Exception; List<Long> acknowledge(Transaction tx, long messageID) throws Exception;
void individualAcknowledge(Transaction tx, long messageID) throws Exception; void individualAcknowledge(Transaction tx, long messageID) throws Exception;

View File

@ -62,7 +62,7 @@ public interface ServerSession extends SecurityAuth {
boolean removeConsumer(long consumerID) throws Exception; boolean removeConsumer(long consumerID) throws Exception;
void acknowledge(long consumerID, long messageID) throws Exception; List<Long> acknowledge(long consumerID, long messageID) throws Exception;
void individualAcknowledge(long consumerID, long messageID) throws Exception; void individualAcknowledge(long consumerID, long messageID) throws Exception;

View File

@ -888,11 +888,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
@Override @Override
public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception { public synchronized List<Long> acknowledge(Transaction tx, final long messageID) throws Exception {
if (browseOnly) { if (browseOnly) {
return; return null;
} }
List<Long> ackedRefs = null;
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged // acknowledged
@ -909,6 +911,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
try { try {
MessageReference ref; MessageReference ref;
ackedRefs = new ArrayList<>();
do { do {
synchronized (lock) { synchronized (lock) {
ref = deliveringRefs.poll(); ref = deliveringRefs.poll();
@ -925,6 +928,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
ref.acknowledge(tx, this); ref.acknowledge(tx, this);
ackedRefs.add(ref.getMessageID());
acks++; acks++;
} }
@ -950,6 +954,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
throw activeMQIllegalStateException; throw activeMQIllegalStateException;
} }
return ackedRefs;
} }
@Override @Override

View File

@ -1191,8 +1191,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
@Override @Override
public void acknowledge(final long consumerID, final long messageID) throws Exception { public List<Long> acknowledge(final long consumerID, final long messageID) throws Exception {
ServerConsumer consumer = findConsumer(consumerID); ServerConsumer consumer = findConsumer(consumerID);
List<Long> ackedRefs = null;
if (tx != null && tx.getState() == State.ROLLEDBACK) { if (tx != null && tx.getState() == State.ROLLEDBACK) {
// JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we will just
@ -1200,7 +1201,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
// The tx has already timed out, so we need to ack and rollback immediately // The tx has already timed out, so we need to ack and rollback immediately
Transaction newTX = newTransaction(); Transaction newTX = newTransaction();
try { try {
consumer.acknowledge(newTX, messageID); ackedRefs = consumer.acknowledge(newTX, messageID);
} catch (Exception e) { } catch (Exception e) {
// just ignored // just ignored
// will log it just in case // will log it just in case
@ -1209,8 +1210,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
} }
newTX.rollback(); newTX.rollback();
} else { } else {
consumer.acknowledge(autoCommitAcks ? null : tx, messageID); ackedRefs = consumer.acknowledge(autoCommitAcks ? null : tx, messageID);
} }
return ackedRefs;
} }
@Override @Override

View File

@ -367,3 +367,49 @@ parameter on the acceptor.
The `stomp-websockets` example shows how to configure an Apache ActiveMQ The `stomp-websockets` example shows how to configure an Apache ActiveMQ
Artemis broker to have web browsers and Java applications exchanges messages. Artemis broker to have web browsers and Java applications exchanges messages.
## Flow Control
STOMP clients can use the `consumer-window-size` header on the `SUBSCRIBE`
frame to control the flow of messages to clients. This is broadly discussed in
the [Flow Control](flow-control.md) chapter.
This ability is similiar to the `activemq.prefetchSize` header supported by
ActiveMQ 5.x. However, that header specifies the size in terms of *messages*
whereas `consumer-window-size` specifies the size in terms of *bytes*. ActiveMQ
Artemis supports the `activemq.prefetchSize` header for backwards compatibility
but the value will be interpreted as *bytes* just like `consumer-window-size`
would be. If both `activemq.prefetchSize` and `consumer-window-size` are set
then the value for `consumer-window-size` will be used.
Setting `consumer-window-size` to `0` will ensure that once a STOMP client
receives a message that it will *not* receive another one until it sends the
appropriate `ACK` or `NACK` frame for the message it already has.
Setting `consumer-window-size` to a value *greater than* `0` will allow it to
receive messages until the cumulative bytes of those messages reaches the
configured size. Once that happens the client will not receive any more
messages until it sends the appropriate `ACK` or `NACK` frame for the messages
it already has.
Setting `consumer-window-size` to `-1` means there is no flow control and the
broker will dispatch messages to clients as fast as it can.
Flow control can be configured at the `acceptor` as well using the
`stompConsumerWindowSize` URL parameter. This value is `10240` (i.e. 10K) by
default for clients using `client` and `client-individual` acknowledgement
modes. It is `-1` for clients using the `auto` acknowledgement mode. Even
if `stompConsumerWindowSize` is set on the STOMP `acceptor` it will be
overriden by the value provided by individual clients using the
`consumer-window-size` header on their `SUBSCRIBE` frame.
> **Note:**
>
> The `stompConsumerWindowSize` URL parameter used to be called
> `stompConsumerCredits` but was changed to be more consistent with the new
> header name (i.e. `consumer-window-size`). The `stompConsumerCredits`
> parameter is deprecated but it will still work for the time being.
Using the [DEBUG logging](#logging) mentioned earlier it is possible to see the
size of the `MESSAGE` frames dispatched to clients. This can help when trying
to determine the best `consumer-window-size` setting.

View File

@ -145,8 +145,8 @@ public class DummyServerConsumer implements ServerConsumer {
} }
@Override @Override
public void acknowledge(Transaction tx, long messageID) throws Exception { public List<Long> acknowledge(Transaction tx, long messageID) throws Exception {
return null;
} }
@Override @Override

View File

@ -178,7 +178,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
* @throws Exception * @throws Exception
*/ */
protected ActiveMQServer createServer() throws Exception { protected ActiveMQServer createServer() throws Exception {
String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + TransportConstants.STOMP_CONSUMERS_CREDIT + "=-1"; String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + TransportConstants.STOMP_CONSUMER_WINDOW_SIZE + "=-1";
if (isEnableStompMessageId()) { if (isEnableStompMessageId()) {
stompAcceptorURI += ";" + TransportConstants.STOMP_ENABLE_MESSAGE_ID + "=true"; stompAcceptorURI += ";" + TransportConstants.STOMP_ENABLE_MESSAGE_ID + "=true";
} }
@ -397,6 +397,30 @@ public abstract class StompTestBase extends ActiveMQTestBase {
String selector, String selector,
String destination, String destination,
boolean receipt) throws IOException, InterruptedException { boolean receipt) throws IOException, InterruptedException {
return subscribe(conn, subscriptionId, ack, durableId, selector, destination, receipt, null);
}
public static ClientStompFrame subscribe(StompClientConnection conn,
String subscriptionId,
String ack,
String durableId,
String selector,
String destination,
boolean receipt,
Integer consumerWindowSize) throws IOException, InterruptedException {
return subscribe(conn, subscriptionId, ack, durableId, selector, destination, receipt, consumerWindowSize, Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE);
}
public static ClientStompFrame subscribe(StompClientConnection conn,
String subscriptionId,
String ack,
String durableId,
String selector,
String destination,
boolean receipt,
Integer consumerWindowSize,
String consumerWindowSizeHeader) throws IOException, InterruptedException {
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE) ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, RoutingType.ANYCAST.toString()) .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, RoutingType.ANYCAST.toString())
.addHeader(Stomp.Headers.Subscribe.DESTINATION, destination); .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination);
@ -412,6 +436,9 @@ public abstract class StompTestBase extends ActiveMQTestBase {
if (selector != null) { if (selector != null) {
frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector); frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
} }
if (consumerWindowSize != null) {
frame.addHeader(consumerWindowSizeHeader, consumerWindowSize.toString());
}
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
if (receipt) { if (receipt) {

View File

@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase; import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame; import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@ -2368,6 +2369,174 @@ public class StompV12Test extends StompTestBase {
conn.disconnect(); conn.disconnect();
} }
@Test
public void testSubscribeWithZeroConsumerWindowSize() throws Exception {
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE, true);
}
@Test
public void testSubscribeWithZeroConsumerWindowSizeLegacyHeader() throws Exception {
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE, true);
}
@Test
public void testSubscribeWithZeroConsumerWindowSizeAndNack() throws Exception {
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE, false);
}
@Test
public void testSubscribeWithZeroConsumerWindowSizeLegacyHeaderAndNack() throws Exception {
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE, false);
}
private void internalSubscribeWithZeroConsumerWindowSize(String consumerWindowSizeHeader, boolean ack) throws Exception {
final int TIMEOUT = 1000;
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null, getQueuePrefix() + getQueueName(), true, 0, consumerWindowSizeHeader);
sendJmsMessage(getName());
sendJmsMessage(getName());
ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame1);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
String messageID = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
Assert.assertNull(frame2);
if (ack) {
ack(conn, messageID);
} else {
nack(conn, messageID);
}
ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame3);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame3.getCommand());
messageID = frame3.getHeader(Stomp.Headers.Message.MESSAGE_ID);
if (ack) {
ack(conn, messageID);
} else {
nack(conn, messageID);
}
conn.disconnect();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(TIMEOUT);
Assert.assertNull(message);
}
@Test
public void testSubscribeWithNonZeroConsumerWindowSize() throws Exception {
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE, true);
}
@Test
public void testSubscribeWithNonZeroConsumerWindowSizeLegacyHeader() throws Exception {
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE, true);
}
@Test
public void testSubscribeWithNonZeroConsumerWindowSizeAndNack() throws Exception {
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE, false);
}
@Test
public void testSubscribeWithNonZeroConsumerWindowSizeLegacyHeaderAndNack() throws Exception {
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE, false);
}
private void internalSubscribeWithNonZeroConsumerWindowSize(String consumerWindowSizeHeader, boolean ack) throws Exception {
// the size of each message was determined from the DEBUG logging from org.apache.activemq.artemis.core.protocol.stomp.StompConnection
final int MESSAGE_SIZE = 270;
final int TIMEOUT = 1000;
final String MESSAGE = "foo-foo-foo";
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null, getQueuePrefix() + getQueueName(), true, MESSAGE_SIZE * 2, consumerWindowSizeHeader);
sendJmsMessage(MESSAGE);
sendJmsMessage(MESSAGE);
sendJmsMessage(MESSAGE);
ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame1);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
String messageID1 = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame2);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
Assert.assertNull(frame3);
if (ack) {
ack(conn, messageID1);
ack(conn, messageID2);
} else {
nack(conn, messageID1);
nack(conn, messageID2);
}
ClientStompFrame frame4 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame4);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame4.getCommand());
String messageID4 = frame4.getHeader(Stomp.Headers.Message.MESSAGE_ID);
if (ack) {
ack(conn, messageID4);
} else {
nack(conn, messageID4);
}
conn.disconnect();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(TIMEOUT);
Assert.assertNull(message);
}
@Test
public void testSubscribeWithNonZeroConsumerWindowSizeAndClientAck() throws Exception {
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.DEBUG);
// the size of each message was determined from the DEBUG logging from org.apache.activemq.artemis.core.protocol.stomp.StompConnection
final int MESSAGE_SIZE = 270;
final int TIMEOUT = 1000;
final String MESSAGE = "foo-foo-foo";
conn.connect(defUser, defPass);
subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, getQueuePrefix() + getQueueName(), true, MESSAGE_SIZE * 2, Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE);
sendJmsMessage(MESSAGE);
sendJmsMessage(MESSAGE);
sendJmsMessage(MESSAGE);
sendJmsMessage(MESSAGE);
ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame1);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame2);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
Assert.assertNull(frame3);
// this should clear the first 2 messages since we're using CLIENT ack mode
ack(conn, messageID2);
ClientStompFrame frame4 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame4);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame4.getCommand());
ClientStompFrame frame5 = conn.receiveFrame(TIMEOUT);
Assert.assertNotNull(frame5);
Assert.assertEquals(Stomp.Responses.MESSAGE, frame5.getCommand());
String messageID5 = frame5.getHeader(Stomp.Headers.Message.MESSAGE_ID);
// this should clear the next 2 messages
ack(conn, messageID5);
conn.disconnect();
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(TIMEOUT);
Assert.assertNull(message);
}
private void ack(StompClientConnection conn, ClientStompFrame frame) throws IOException, InterruptedException { private void ack(StompClientConnection conn, ClientStompFrame frame) throws IOException, InterruptedException {
String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);