diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index 5e3d95e07f..4946efb960 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -86,6 +86,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { // Number of pending calls on flow control private final ReusableLatch pendingFlowControl = new ReusableLatch(0); + private final int initialWindow; + private final int clientWindowSize; private final int ackBatchSize; @@ -140,6 +142,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { final SimpleString queueName, final SimpleString filterString, final boolean browseOnly, + final int initialWindow, final int clientWindowSize, final int ackBatchSize, final TokenBucketLimiter rateLimiter, @@ -164,6 +167,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { sessionExecutor = executor; + this.initialWindow = initialWindow; + this.clientWindowSize = clientWindowSize; this.ackBatchSize = ackBatchSize; @@ -743,6 +748,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { } } + @Override + public int getInitialWindowSize() { + return initialWindow; + } + @Override public int getClientWindowSize() { return clientWindowSize; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java index 986f397d25..819082134b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerInternal.java @@ -55,6 +55,8 @@ public interface ClientConsumerInternal extends ClientConsumer { int getClientWindowSize(); + int getInitialWindowSize(); + int getBufferSize(); void cleanUp() throws ActiveMQException; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index f1ef526f58..2edf6292c6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -1888,7 +1888,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi // TODO: this could semantically change on other servers. I know for instance on stomp this is just an ignore if (consumer.getClientWindowSize() != 0) { - sessionContext.sendConsumerCredits(consumer, consumer.getClientWindowSize()); + sessionContext.sendConsumerCredits(consumer, consumer.getInitialWindowSize()); } return consumer; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 4ee7ee9a79..18227cbd54 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -329,7 +329,7 @@ public class ActiveMQSessionContext extends SessionContext { // The value we send is just a hint final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize; - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } @Override diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java index ee3520fa1e..f06772cb39 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java @@ -96,7 +96,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext { // could be overridden on the queue settings // The value we send is just a hint - return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); + return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java index e58f4bcaa1..2fb737c553 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java @@ -1454,7 +1454,7 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase { ServerSession ss = messagingService.getSessionByID(((ClientSessionImpl)session).getName()); ServerConsumerImpl cons = (ServerConsumerImpl) ss.locateConsumer(consumer.getConsumerContext().getId()); - assertTrue(Wait.waitFor(() -> cons.getAvailableCredits().get() == consumer.getClientWindowSize(), 5000, 500)); + assertTrue(Wait.waitFor(() -> cons.getAvailableCredits().get() == consumer.getClientWindowSize() * 2, 5000, 50)); } @Test @@ -1465,7 +1465,7 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase { final AddressSettings settings = new AddressSettings(); settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize); messagingService.getConfiguration() - .getAddressesSettings().put("#", settings); + .getAddressesSettings().put("#", settings); messagingService.start(); messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java index c1790d0d60..a68382f79c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/client/impl/LargeMessageBufferTest.java @@ -735,6 +735,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase { return 0; } + @Override + public int getInitialWindowSize() { + return 0; + } + @Override public SimpleString getFilterString() {