This commit is contained in:
Clebert Suconic 2018-08-26 18:25:47 -04:00
commit e020090551
3 changed files with 40 additions and 7 deletions

View File

@ -1887,8 +1887,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
// consumer
// TODO: this could semantically change on other servers. I know for instance on stomp this is just an ignore
if (windowSize != 0) {
sessionContext.sendConsumerCredits(consumer, windowSize);
if (consumer.getClientWindowSize() != 0) {
sessionContext.sendConsumerCredits(consumer, consumer.getClientWindowSize());
}
return consumer;

View File

@ -876,7 +876,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
QueueQueryResult response;
Binding binding = getPostOffice().getBinding(name);
final SimpleString addressName = binding != null && binding.getType() == BindingType.LOCAL_QUEUE
? binding.getAddress() : name;
final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString());
boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
@ -885,10 +892,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
QueueQueryResult response;
Binding binding = getPostOffice().getBinding(name);
SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {

View File

@ -36,15 +36,18 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@ -1427,6 +1430,33 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
}
@Test
public void testConsumerWindowSizeAddressSettingsDifferentAddressAndQueueName() throws Exception {
ActiveMQServer messagingService = createServer(false, isNetty());
final int defaultConsumerWindowSize = 1024 * 5;
final AddressSettings settings = new AddressSettings();
settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
messagingService.getConfiguration()
.getAddressesSettings().put(addressA.toString(), settings);
messagingService.start();
messagingService.createQueue(addressA, RoutingType.ANYCAST, queueA, null, true, false);
ClientSessionFactory cf = createSessionFactory(locator);
ClientSession session = cf.createSession(false, true, true);
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
session.start();
assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
ServerSession ss = messagingService.getSessionByID(((ClientSessionImpl)session).getName());
ServerConsumerImpl cons = (ServerConsumerImpl) ss.locateConsumer(consumer.getConsumerContext().getId());
assertTrue(Wait.waitFor(() -> cons.getAvailableCredits().get() == consumer.getClientWindowSize(), 5000, 500));
}
@Test
public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
ActiveMQServer messagingService = createServer(false, isNetty());