ARTEMIS-2052 - Fix defaultConsumerWindowSize negotiation
First, QueueQuery should use address name for address settings The name used for looking up address settings for a queue now uses the address name if there is a local queue binding Second, make sure sent credits to the server is the correct value
This commit is contained in:
parent
f8140b91d4
commit
87f393e597
|
@ -1887,8 +1887,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
||||||
// consumer
|
// consumer
|
||||||
|
|
||||||
// TODO: this could semantically change on other servers. I know for instance on stomp this is just an ignore
|
// TODO: this could semantically change on other servers. I know for instance on stomp this is just an ignore
|
||||||
if (windowSize != 0) {
|
if (consumer.getClientWindowSize() != 0) {
|
||||||
sessionContext.sendConsumerCredits(consumer, windowSize);
|
sessionContext.sendConsumerCredits(consumer, consumer.getClientWindowSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
return consumer;
|
return consumer;
|
||||||
|
|
|
@ -876,7 +876,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
|
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
|
QueueQueryResult response;
|
||||||
|
|
||||||
|
Binding binding = getPostOffice().getBinding(name);
|
||||||
|
|
||||||
|
final SimpleString addressName = binding != null && binding.getType() == BindingType.LOCAL_QUEUE
|
||||||
|
? binding.getAddress() : name;
|
||||||
|
|
||||||
|
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
|
||||||
|
|
||||||
boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
|
boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
|
||||||
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
|
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
|
||||||
|
@ -885,10 +892,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
|
boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
|
||||||
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
|
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
|
||||||
|
|
||||||
QueueQueryResult response;
|
|
||||||
|
|
||||||
Binding binding = getPostOffice().getBinding(name);
|
|
||||||
|
|
||||||
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
|
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
|
||||||
|
|
||||||
if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
|
if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
|
||||||
|
|
|
@ -36,15 +36,18 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
@ -1427,6 +1430,33 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
|
||||||
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
|
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConsumerWindowSizeAddressSettingsDifferentAddressAndQueueName() throws Exception {
|
||||||
|
ActiveMQServer messagingService = createServer(false, isNetty());
|
||||||
|
|
||||||
|
final int defaultConsumerWindowSize = 1024 * 5;
|
||||||
|
final AddressSettings settings = new AddressSettings();
|
||||||
|
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
|
||||||
|
messagingService.getConfiguration()
|
||||||
|
.getAddressesSettings().put(addressA.toString(), settings);
|
||||||
|
|
||||||
|
messagingService.start();
|
||||||
|
messagingService.createQueue(addressA, RoutingType.ANYCAST, queueA, null, true, false);
|
||||||
|
|
||||||
|
ClientSessionFactory cf = createSessionFactory(locator);
|
||||||
|
ClientSession session = cf.createSession(false, true, true);
|
||||||
|
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
|
||||||
|
|
||||||
|
session.start();
|
||||||
|
|
||||||
|
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
|
||||||
|
|
||||||
|
ServerSession ss = messagingService.getSessionByID(((ClientSessionImpl)session).getName());
|
||||||
|
ServerConsumerImpl cons = (ServerConsumerImpl) ss.locateConsumer(consumer.getConsumerContext().getId());
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(() -> cons.getAvailableCredits().get() == consumer.getClientWindowSize(), 5000, 500));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
|
public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
|
||||||
ActiveMQServer messagingService = createServer(false, isNetty());
|
ActiveMQServer messagingService = createServer(false, isNetty());
|
||||||
|
|
Loading…
Reference in New Issue