This commit is contained in:
Clebert Suconic 2018-08-29 13:39:52 -04:00
commit ff6a69045e
7 changed files with 22 additions and 5 deletions

View File

@ -86,6 +86,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
// Number of pending calls on flow control
private final ReusableLatch pendingFlowControl = new ReusableLatch(0);
private final int initialWindow;
private final int clientWindowSize;
private final int ackBatchSize;
@ -140,6 +142,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
final SimpleString queueName,
final SimpleString filterString,
final boolean browseOnly,
final int initialWindow,
final int clientWindowSize,
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
@ -164,6 +167,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
sessionExecutor = executor;
this.initialWindow = initialWindow;
this.clientWindowSize = clientWindowSize;
this.ackBatchSize = ackBatchSize;
@ -743,6 +748,11 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
}
}
@Override
public int getInitialWindowSize() {
return initialWindow;
}
@Override
public int getClientWindowSize() {
return clientWindowSize;

View File

@ -55,6 +55,8 @@ public interface ClientConsumerInternal extends ClientConsumer {
int getClientWindowSize();
int getInitialWindowSize();
int getBufferSize();
void cleanUp() throws ActiveMQException;

View File

@ -1888,7 +1888,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// TODO: this could semantically change on other servers. I know for instance on stomp this is just an ignore
if (consumer.getClientWindowSize() != 0) {
sessionContext.sendConsumerCredits(consumer, consumer.getClientWindowSize());
sessionContext.sendConsumerCredits(consumer, consumer.getInitialWindowSize());
}
return consumer;

View File

@ -329,7 +329,7 @@ public class ActiveMQSessionContext extends SessionContext {
// The value we send is just a hint
final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, consumerWindowSize, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
@Override

View File

@ -96,7 +96,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
// could be overridden on the queue settings
// The value we send is just a hint
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, windowSize, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
}

View File

@ -1454,7 +1454,7 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
ServerSession ss = messagingService.getSessionByID(((ClientSessionImpl)session).getName());
ServerConsumerImpl cons = (ServerConsumerImpl) ss.locateConsumer(consumer.getConsumerContext().getId());
assertTrue(Wait.waitFor(() -> cons.getAvailableCredits().get() == consumer.getClientWindowSize(), 5000, 500));
assertTrue(Wait.waitFor(() -> cons.getAvailableCredits().get() == consumer.getClientWindowSize() * 2, 5000, 50));
}
@Test
@ -1465,7 +1465,7 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
final AddressSettings settings = new AddressSettings();
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
messagingService.getConfiguration()
.getAddressesSettings().put("#", settings);
.getAddressesSettings().put("#", settings);
messagingService.start();
messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);

View File

@ -735,6 +735,11 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
return 0;
}
@Override
public int getInitialWindowSize() {
return 0;
}
@Override
public SimpleString getFilterString() {