ARTEMIS-4132 AMQP Receiver default to ANYCAST when creating an address

When an AMQP client subscribes to a new address (non-existing) with a receiver link, the
address is created with routing type ANYCAST regardles of the default address creation
configuration of the broker, and ignores even the broker wide default of MULTICAST.
This commit is contained in:
Timothy Bish 2023-01-23 12:38:16 -05:00
parent 0d3cd8d880
commit ca66028b2a
4 changed files with 256 additions and 4 deletions

View File

@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
@ -960,7 +962,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
SimpleString tempQueueName;
String selector;
private final RoutingType defaultRoutingType = RoutingType.ANYCAST;
private RoutingType routingTypeToUse = RoutingType.ANYCAST;
private boolean isVolatile = false;
@ -1110,8 +1111,15 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
} else {
// if not we look up the address
AddressQueryResult addressQueryResult = null;
// Set this to the broker configured default for the address prior to the lookup so that
// an auto create will actually use the configured defaults. The actual query result will
// contain the true answer on what routing type the address actually has though.
final RoutingType routingType = sessionSPI.getDefaultRoutingType(addressToUse);
routingTypeToUse = routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType;
try {
addressQueryResult = sessionSPI.addressQuery(addressToUse, defaultRoutingType, true);
addressQueryResult = sessionSPI.addressQuery(addressToUse, routingTypeToUse, true);
} catch (ActiveMQSecurityException e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(e.getMessage());
} catch (ActiveMQAMQPException e) {

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class AutoCreateWithDefaultRoutingTypesTest extends JMSClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Parameterized.Parameters(name = "routingType={0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{RoutingType.ANYCAST}, {RoutingType.MULTICAST}
});
}
@Parameterized.Parameter(0)
public RoutingType routingType;
@Override
protected String getConfiguredProtocols() {
return "AMQP";
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
// Don't create anything by default since we are testing auto create
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
Configuration serverConfig = server.getConfiguration();
serverConfig.setJournalType(JournalType.NIO);
Map<String, AddressSettings> map = serverConfig.getAddressSettings();
if (map.size() == 0) {
AddressSettings as = new AddressSettings();
map.put("#", as);
}
Map.Entry<String, AddressSettings> entry = map.entrySet().iterator().next();
AddressSettings settings = entry.getValue();
settings.setAutoCreateQueues(true);
settings.setDefaultAddressRoutingType(routingType);
settings.setDefaultQueueRoutingType(routingType);
logger.info("server config, isauto? {}", entry.getValue().isAutoCreateQueues());
logger.info("server config, default queue routing type? {}", entry.getValue().getDefaultQueueRoutingType());
logger.info("server config, default address routing type? {}", entry.getValue().getDefaultAddressRoutingType());
}
@Test(timeout = 30_000)
public void testCreateSender() throws Exception {
final String addressName = "sender-address";
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(addressName);
AddressQueryResult address = getProxyToAddress(addressName);
assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());
sender.close();
connection.close();
}
@Test(timeout = 30_000)
public void testCreateReceiver() throws Exception {
final String addressName = "receiver-address";
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(addressName);
AddressQueryResult address = getProxyToAddress(addressName);
assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());
receiver.close();
connection.close();
}
@Test(timeout = 30_000)
public void testCreateSenderThatRequestsMultiCast() throws Exception {
dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.MULTICAST);
}
@Test(timeout = 30_000)
public void testCreateSenderThatRequestsAnyCast() throws Exception {
dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType.ANYCAST);
}
private void dotestCreateSenderThatRequestsSpecificRoutingType(RoutingType routingType) throws Exception {
final String addressName = "sender-defined-address";
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Target target = new Target();
target.setAddress(addressName);
if (routingType == RoutingType.ANYCAST) {
target.setCapabilities(QUEUE_CAPABILITY);
} else {
target.setCapabilities(TOPIC_CAPABILITY);
}
AmqpSender sender = session.createSender(target);
AddressQueryResult address = getProxyToAddress(addressName);
assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());
sender.close();
connection.close();
}
@Test(timeout = 30_000)
public void testCreateReceiverThatRequestsMultiCast() throws Exception {
dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.MULTICAST);
}
@Test(timeout = 30_000)
public void testCreateReceiverThatRequestsAnyCast() throws Exception {
dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType.ANYCAST);
}
private void dotestCreateReceiverThatRequestsSpecificRoutingType(RoutingType routingType) throws Exception {
final String addressName = "receiver-defined-address";
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Source source = new Source();
source.setAddress(addressName);
if (routingType == RoutingType.ANYCAST) {
source.setCapabilities(QUEUE_CAPABILITY);
} else {
source.setCapabilities(TOPIC_CAPABILITY);
}
AmqpReceiver receiver = session.createReceiver(source);
AddressQueryResult address = getProxyToAddress(addressName);
assertNotNull(address);
assertEquals(Set.of(routingType), address.getRoutingTypes());
receiver.close();
connection.close();
}
public AddressQueryResult getProxyToAddress(String addressName) throws Exception {
return server.addressQuery(SimpleString.toSimpleString(addressName));
}
}

View File

@ -197,6 +197,7 @@ public class BrokerDefinedAnycastConsumerTest extends AmqpClientTestSupport {
server.getAddressSettingsRepository().clear();
AddressSettings settings = new AddressSettings();
settings.setAutoCreateAddresses(true);
settings.setDefaultAddressRoutingType(RoutingType.ANYCAST);
server.getAddressSettingsRepository().addMatch(address.toString(), settings);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());

View File

@ -16,15 +16,23 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.QUEUE_CAPABILITY;
import static org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper.TOPIC_CAPABILITY;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.junit.Test;
public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@ -33,12 +41,15 @@ public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameName() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(address.toString());
Source source = new Source();
source.setAddress(address.toString());
source.setCapabilities(QUEUE_CAPABILITY);
AmqpReceiver receiver = session.createReceiver(source);
sendMessages(address.toString(), 1);
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
@ -48,4 +59,31 @@ public class ClientDefinedAnycastConsumerTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testConsumeFromSingleQueueOnAddressSameNameNegativeValidation() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Source source = new Source();
source.setAddress(address.toString());
source.setCapabilities(TOPIC_CAPABILITY);
AmqpReceiver receiver = session.createReceiver(source);
sendMessages(address.toString(), 1);
receiver.flow(1);
AmqpMessage amqpMessage = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
Bindings bindings = server.getPostOffice().getBindingsForAddress(address);
assertEquals(1, bindings.getBindings().size());
bindings.getBindings().forEach((binding) -> {
final Queue localQueue = ((LocalQueueBinding) binding).getQueue();
assertEquals(1, localQueue.getConsumerCount());
assertEquals(RoutingType.MULTICAST, localQueue.getRoutingType());
});
receiver.close();
connection.close();
}
}