ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

- activemq.notifications are being transferred to the target node, unless an ignore is setup
- topics are being duplicated after redistribution
- topics sends are being duplicated when a 2 node cluster mirrors to another 2 node cluster, and both nodes are mirrored.
This commit is contained in:
Clebert Suconic 2023-04-20 22:21:33 -04:00 committed by Justin Bertram
parent 4e77a34c29
commit 2a81a0a3c6
16 changed files with 734 additions and 114 deletions

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
@ -76,6 +77,12 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
/** When a clustered node (from regular cluster connections) receives a message
it will have target queues associated with it
this could be from message redistribution or simply load balancing.
an that case this will have the queue associated with it */
public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");
// Capabilities
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
@ -83,7 +90,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true));
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));
final Queue snfQueue;
final ActiveMQServer server;
@ -228,13 +235,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
public void sendMessage(Transaction tx, Message message, RoutingContext context) {
SimpleString address = context.getAddress(message);
if (invalidTarget(context.getMirrorSource())) {
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
return;
}
if (context.isInternal()) {
logger.trace("sendMessage::server {} is discarding send to avoid sending to internal queue", server);
if (invalidTarget(context.getMirrorSource())) {
logger.trace("sendMessage::server {} is discarding send to avoid infinite loop (reflection with the mirror)", server);
return;
}
@ -256,7 +263,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
}
MessageReference ref = MessageReference.Factory.createReference(message, snfQueue);
setProtocolData(ref, nodeID, idSupplier.getID(ref));
setProtocolData(ref, nodeID, idSupplier.getID(ref), context);
snfQueue.refUp(ref);
@ -330,12 +337,12 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
String brokerID = referenceIDSupplier.getServerID(ref);
long id = referenceIDSupplier.getID(ref);
setProtocolData(ref, brokerID, id);
setProtocolData(ref, brokerID, id, null);
return brokerID;
}
private static void setProtocolData(MessageReference ref, String brokerID, long id) {
private static void setProtocolData(MessageReference ref, String brokerID, long id, RoutingContext routingContext) {
Map<Symbol, Object> daMap = new HashMap<>();
DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(daMap);
@ -357,6 +364,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
}
}
if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
ArrayList<String> queues = new ArrayList<>();
routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
daMap.put(TARGET_QUEUES, queues);
}
ref.setProtocolData(DeliveryAnnotations.class, deliveryAnnotations);
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
import java.util.Collection;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
@ -29,6 +30,8 @@ import org.apache.activemq.artemis.core.io.RunnableCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -73,6 +76,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirro
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.POST_ACK;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.QUEUE;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
import static org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.TARGET_QUEUES;
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
@ -436,6 +440,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
Long internalIDLong = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_ID);
String internalAddress = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, INTERNAL_DESTINATION);
Collection<String> targetQueues = (Collection) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(message, TARGET_QUEUES);
long internalID = 0;
if (internalIDLong != null) {
@ -479,8 +485,13 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.LOCAL_ONLY);
if (targetQueues != null) {
targetQueuesRouting(message, routingContext, targetQueues);
server.getPostOffice().processRoute(message, routingContext, false);
} else {
server.getPostOffice().route(message, routingContext, false);
}
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();
flow();
@ -489,6 +500,24 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
return true;
}
/** When the source mirror receives messages from a cluster member of his own, it should then fill targetQueues so we could play the same semantic the source applied on its routing */
private void targetQueuesRouting( final Message message,
final RoutingContext context,
final Collection<String> queueNames) throws Exception {
Bindings bindings = server.getPostOffice().getBindingsForAddress(message.getAddressSimpleString());
queueNames.forEach(name -> {
Binding binding = bindings.getBinding(name);
if (binding != null) {
try {
binding.route(message, context);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
// Do nothing

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.postoffice;
import java.util.Collection;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -32,6 +33,8 @@ public interface Bindings extends UnproposalListener {
Collection<Binding> getBindings();
Binding getBinding(String name);
void addBinding(Binding binding);
Binding removeBindingByUniqueName(SimpleString uniqueName);
@ -55,4 +58,8 @@ public interface Bindings extends UnproposalListener {
void route(Message message, RoutingContext context) throws Exception;
boolean allowRedistribute();
void forEach(BiConsumer<SimpleString, Binding> bindingConsumer);
int size();
}

View File

@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
@ -51,6 +52,7 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.BiConsumer;
public final class BindingsImpl implements Bindings {
@ -119,6 +121,11 @@ public final class BindingsImpl implements Bindings {
}
}
@Override
public Binding getBinding(String name) {
return bindingsNameMap.get(SimpleString.toSimpleString(name));
}
@Override
public void addBinding(final Binding binding) {
try {
@ -183,6 +190,17 @@ public final class BindingsImpl implements Bindings {
return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
}
@Override
public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
bindingsNameMap.forEach(bindingConsumer);
}
@Override
public int size() {
return bindingsNameMap.size();
}
@Override
public Message redistribute(final Message message,
final Queue originatingQueue,
@ -439,6 +457,10 @@ public final class BindingsImpl implements Bindings {
}
}
if (loadBalancingType.equals(MessageLoadBalancingType.LOCAL_ONLY) && binding instanceof RemoteQueueBinding) {
return false;
}
final Filter filter = binding.getFilter();
if (filter == null || filter.match(message)) {
@ -577,6 +599,9 @@ public final class BindingsImpl implements Bindings {
private void routeFromCluster(final Message message,
final RoutingContext context,
final byte[] ids) throws Exception {
if (!context.isMirrorDisabled()) {
context.setMirrorOption(MirrorOption.individualRoute);
}
byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);
List<Long> idsToAckList = new ArrayList<>();

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server;
/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
* */
public enum MirrorOption {
enabled,
disabled,
individualRoute
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -40,9 +41,15 @@ public interface RoutingContext {
/** If the routing is from MirrorController, we don't redo mirrorController
* to avoid*/
MirrorOption getMirrorOption();
void forEachDurable(Consumer<Queue> consumer);
RoutingContext setMirrorOption(MirrorOption option);
boolean isMirrorDisabled();
RoutingContext setMirrorDisabled(boolean mirrorDisabled);
boolean isMirrorIndividualRoute();
/** return true if every queue routed is internal */
boolean isInternal();

View File

@ -784,7 +784,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
} else {
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
queue = server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true).setMaxConsumers(-1).setPurgeOnNoConsumers(false).setInternal(true));
}
// There are a few things that will behave differently when it's an internal queue

View File

@ -17,5 +17,5 @@
package org.apache.activemq.artemis.core.server.cluster.impl;
public enum MessageLoadBalancingType {
OFF, STRICT, ON_DEMAND, OFF_WITH_REDISTRIBUTION;
OFF, STRICT, ON_DEMAND, OFF_WITH_REDISTRIBUTION, LOCAL_ONLY; // notice that LOCAL_ONLY is an internal use only option. When Mirror sends a message to a target mirror, messages should be routed locally only and to not any other cluster.
}

View File

@ -3986,7 +3986,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callBrokerQueuePlugins(plugin -> plugin.beforeCreateQueue(queueConfiguration));
}
if (mirrorControllerService != null) {
if (mirrorControllerService != null && !queueConfiguration.isInternal()) {
mirrorControllerService.createQueue(queueConfiguration);
}

View File

@ -82,6 +82,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -707,6 +708,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.server = server;
if (queueConfiguration.isInternal()) {
this.internalQueue = queueConfiguration.isInternal();
}
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
if (addressSettingsRepository != null) {
@ -3540,7 +3545,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// we Disable mirror on expiration as the target might be also expiring it
// and this could cause races
// we will only send the ACK for the expiration with the reason=EXPIRE and the expire will be played on the mirror side
context.setMirrorDisabled(true);
context.setMirrorOption(MirrorOption.disabled);
}
routingStatus = postOffice.route(copyMessage, context, false, rejectDuplicate, binding);

View File

@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -36,7 +38,6 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
public class RoutingContextImpl implements RoutingContext {
// The pair here is Durable and NonDurable
private final Map<SimpleString, RouteContextList> map = new HashMap<>();
private Transaction transaction;
@ -63,7 +64,7 @@ public class RoutingContextImpl implements RoutingContext {
volatile int version;
boolean mirrorDisabled = false;
MirrorOption mirrorOption = MirrorOption.enabled;
private boolean duplicateDetection = true;
@ -85,13 +86,30 @@ public class RoutingContextImpl implements RoutingContext {
}
@Override
public boolean isMirrorDisabled() {
return mirrorDisabled;
public MirrorOption getMirrorOption() {
return mirrorOption;
}
@Override
public RoutingContextImpl setMirrorDisabled(boolean mirrorDisabled) {
this.mirrorDisabled = mirrorDisabled;
public boolean isMirrorDisabled() {
return mirrorOption == MirrorOption.disabled;
}
@Override
public void forEachDurable(Consumer<Queue> queueConsumer) {
map.forEach((a, b) -> {
b.getDurableQueues().forEach(queueConsumer);
});
}
@Override
public boolean isMirrorIndividualRoute() {
return mirrorOption == MirrorOption.individualRoute;
}
@Override
public RoutingContextImpl setMirrorOption(MirrorOption mirrorOption) {
this.mirrorOption = mirrorOption;
return this;
}
@ -150,6 +168,10 @@ public class RoutingContextImpl implements RoutingContext {
this.internalOnly = null;
if (mirrorOption == MirrorOption.individualRoute) {
mirrorOption = MirrorOption.enabled;
}
return this;
}

View File

@ -725,6 +725,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
AuditLogger.createQueue(this, remotingConnection.getSubject(), remotingConnection.getRemoteAddress(), queueConfiguration);
}
if (queueConfiguration.getAddress().equals(server.getConfiguration().getManagementNotificationAddress())) {
queueConfiguration.setInternal(true);
}
queueConfiguration
.setRoutingType(getRoutingTypeFromPrefix(queueConfiguration.getAddress(), queueConfiguration.getRoutingType()))
.setAddress(removePrefix(queueConfiguration.getAddress()))

View File

@ -22,7 +22,6 @@ import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
@ -41,7 +40,6 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
@ -78,25 +76,8 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
protected static final Symbol SHARED = Symbol.getSymbol("shared");
protected static final Symbol GLOBAL = Symbol.getSymbol("global");
protected static final String BROKER_NAME = "localhost";
protected static final String NETTY_ACCEPTOR = "netty-acceptor";
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";
protected String browseUser = "browser";
protected String browsePass = "browser";
protected String guestUser = "guest";
protected String guestPass = "guest";
protected String fullUser = "user";
protected String fullPass = "pass";
protected ActiveMQServer server;
protected MBeanServer mBeanServer = createMBeanServer();
@Before
@Override
public void setUp() throws Exception {
@ -170,68 +151,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return createServer(port, true);
}
protected ActiveMQServer createServer(int port, boolean start) throws Exception {
final ActiveMQServer server = this.createServer(true, true);
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port));
server.getConfiguration().setName(BROKER_NAME);
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
if (port == AMQP_PORT) {
// we use the default large directory if the default port
// as some tests will assert number of files
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory());
} else {
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory() + port);
}
server.getConfiguration().setJMXManagementEnabled(true);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.setMBeanServer(mBeanServer);
// Add any additional Acceptors needed for tests
addAdditionalAcceptors(server);
// Address configuration
configureAddressPolicy(server);
// Add optional security for tests that need it
configureBrokerSecurity(server);
// Add extra configuration
addConfiguration(server);
if (start) {
server.start();
// Prepare all addresses and queues for client tests.
createAddressAndQueues(server);
}
return server;
}
protected void addConfiguration(ActiveMQServer server) {
}
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
HashMap<String, Object> amqpParams = new HashMap<>();
configureAMQPAcceptorParameters(amqpParams);
TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams);
configureAMQPAcceptorParameters(tc);
return tc;
}
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE";
}
@Override
protected void configureAddressPolicy(ActiveMQServer server) {
// Address configuration
AddressSettings addressSettings = new AddressSettings();
@ -252,6 +172,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
}
@Override
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
// Default Queue
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
@ -272,10 +193,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
}
}
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
// None by default
}
@Override
protected void configureBrokerSecurity(ActiveMQServer server) {
if (isSecurityEnabled()) {
enableSecurity(server);
@ -313,14 +231,6 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
server.getConfiguration().setSecurityEnabled(true);
}
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
// None by default
}
protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
// None by default
}
public Queue getProxyToQueue(String queueName) {
return server.locateQueue(SimpleString.toSimpleString(queueName));
}

View File

@ -16,11 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.management.MBeanServer;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -35,6 +41,21 @@ import org.junit.After;
*/
public class AmqpTestSupport extends ActiveMQTestBase {
protected static final String BROKER_NAME = "localhost";
protected static final String NETTY_ACCEPTOR = "netty-acceptor";
protected String noprivUser = "noprivs";
protected String noprivPass = "noprivs";
protected String browseUser = "browser";
protected String browsePass = "browser";
protected String guestUser = "guest";
protected String guestPass = "guest";
protected String fullUser = "user";
protected String fullPass = "pass";
protected static final int AMQP_PORT = 5672;
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
@ -47,6 +68,8 @@ public class AmqpTestSupport extends ActiveMQTestBase {
return connection;
}
protected MBeanServer mBeanServer = createMBeanServer();
@After
@Override
public void tearDown() throws Exception {
@ -148,4 +171,83 @@ public class AmqpTestSupport extends ActiveMQTestBase {
}
protected ActiveMQServer createServer(int port, boolean start) throws Exception {
final ActiveMQServer server = this.createServer(true, true);
server.getConfiguration().getAcceptorConfigurations().clear();
server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port));
server.getConfiguration().setName(BROKER_NAME);
server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port);
server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port);
server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port);
if (port == AMQP_PORT) {
// we use the default large directory if the default port
// as some tests will assert number of files
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory());
} else {
server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory() + port);
}
server.getConfiguration().setJMXManagementEnabled(true);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.setMBeanServer(mBeanServer);
// Add any additional Acceptors needed for tests
addAdditionalAcceptors(server);
// Address configuration
configureAddressPolicy(server);
// Add optional security for tests that need it
configureBrokerSecurity(server);
// Add extra configuration
addConfiguration(server);
if (start) {
server.start();
// Prepare all addresses and queues for client tests.
createAddressAndQueues(server);
}
return server;
}
protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
}
protected void addConfiguration(ActiveMQServer server) {
}
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
}
protected void configureAddressPolicy(ActiveMQServer server) {
}
protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) {
HashMap<String, Object> params = new HashMap<>();
params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols());
HashMap<String, Object> amqpParams = new HashMap<>();
configureAMQPAcceptorParameters(amqpParams);
TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams);
configureAMQPAcceptorParameters(tc);
return tc;
}
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE";
}
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
}
protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
}
protected void configureBrokerSecurity(ActiveMQServer server) {
}
}

View File

@ -0,0 +1,454 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MirrorOption;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQPRedistributeClusterTest extends AmqpTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String QUEUE_NAME = "REDIST_QUEUE";
private static final String TOPIC_NAME = "REDIST_TOPIC";
private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
protected static final int A_1_PORT = 5673;
protected static final int A_2_PORT = 5674;
ActiveMQServer a1;
ActiveMQServer a2;
protected static final int B_1_PORT = 5773;
protected static final int B_2_PORT = 5774;
ActiveMQServer b1;
ActiveMQServer b2;
@Before
public void setCluster() throws Exception {
a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
a1.start();
a2.start();
b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
b1.start();
b2.start();
}
private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
ActiveMQServer server = createServer(thisPort, false);
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
server.getConfiguration().clearAddressSettings();
server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
server.setIdentity(name);
server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
server.getConfiguration().addClusterConfiguration(clusterConfiguration);
if (mirrorPort > 0) {
server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
}
return server;
}
@Test
public void testQueueRedistributionAMQP() throws Exception {
internalQueueRedistribution("AMQP");
}
@Test
public void testQueueRedistributionCORE() throws Exception {
internalQueueRedistribution("CORE");
}
public void internalQueueRedistribution(String protocol) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter((AssertionLoggerHandler::stopCapture));
ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
try (Connection conn = cfA1.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("Hello" + i));
}
}
try (Connection connA1 = cfA1.createConnection();
Connection connA2 = cfA2.createConnection()) {
connA1.start();
connA2.start();
Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < 100; i++) {
MessageConsumer consumer;
String place;
if (i % 2 == 0) {
place = "A1";
consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
} else {
place = "A2";
consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
}
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
logger.debug("Received message {} from {}", message, place);
consumer.close();
}
}
assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
// if you see this message, most likely the notifications are being copied to the mirror
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
}
@Test
public void testTopicRedistributionAMQP() throws Exception {
internalTopicRedistribution("AMQP");
}
@Test
public void testTopicRedistributionCORE() throws Exception {
internalTopicRedistribution("CORE");
}
public void internalTopicRedistribution(String protocol) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter((AssertionLoggerHandler::stopCapture));
final int numMessages = 100;
String subscriptionName = "my-topic-shared-subscription";
ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
Topic topic;
try (Connection conn = cfA1.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
consumer.close();
}
try (Connection conn = cfA2.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
consumer.close();
}
Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
// naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
String subscriptionQueueName;
{
HashSet<String> subscriptionSet = new HashSet<>();
// making sure the queues created on a1 are propaged into b1
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
logger.debug("{} = {}", n, b);
if (b instanceof LocalQueueBinding) {
QueueBinding qb = (QueueBinding) b;
subscriptionSet.add(qb.getUniqueName().toString());
Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
}
});
Assert.assertEquals(1, subscriptionSet.size());
subscriptionQueueName = subscriptionSet.iterator().next();
}
// making sure the queues created on a2 are propaged into b2
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
logger.debug("{} = {}", n, b);
if (b instanceof LocalQueueBinding) {
QueueBinding qb = (QueueBinding) b;
Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
}
});
Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName);
Assert.assertNotNull(a1TopicSubscription);
Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName);
Assert.assertNotNull(a2TopicSubscription);
Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName);
Assert.assertNotNull(b1TopicSubscription);
Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName);
Assert.assertNotNull(a2);
try (Connection conn = cfA1.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < numMessages; i++) {
producer.send(session.createTextMessage("Hello" + i));
}
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount);
logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), b2TopicSubscription.getMessageCount());
Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount);
Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount);
try (Connection connA1 = cfA1.createConnection();
Connection connA2 = cfA2.createConnection()) {
connA1.start();
connA2.start();
Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i = 0; i < numMessages; i++) {
MessageConsumer consumer;
String place;
if (i % 2 == 0) {
place = "A1";
consumer = sessionA1.createSharedDurableConsumer(topic, subscriptionName);
} else {
place = "A2";
consumer = sessionA2.createSharedDurableConsumer(topic, subscriptionName);
}
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
logger.debug("Received message {} from {}", message, place);
consumer.close();
}
}
// if you see this message, most likely the notifications are being copied to the mirror
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
assertEmptyQueue(a1TopicSubscription);
assertEmptyQueue(a2TopicSubscription);
assertEmptyQueue(b1TopicSubscription);
assertEmptyQueue(b2TopicSubscription);
}
// This test is playing with Remote binding routing, similarly to how topic redistribution would happen
@Test
public void testRemoteBindingRouting() throws Exception {
final String protocol = "AMQP";
String subscriptionName = "my-topic-shared-subscription";
ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
Topic topic;
try (Connection conn = cfA1.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
for (int i = 0; i < 10; i++) {
session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
}
}
try (Connection conn = cfA2.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
for (int i = 0; i < 10; i++) {
session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
}
}
Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
// making sure the queues created on a2 are propaged into b2
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
logger.debug("{} = {}", a, b);
remoteQueueBindings_a2.add((RemoteQueueBinding) b);
}
});
Assert.assertEquals(1, remoteQueueBindings_a2.size());
RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
directMessage.setAddress(TOPIC_NAME);
directMessage.putStringProperty("Test", "t1");
remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
a2.getPostOffice().processRoute(directMessage, routingContext, false);
routingContext.getTransaction().commit();
for (int i = 0; i < 10; i++) {
String name = "my-topic-shared-subscription_" + i + ":global";
Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
}
}
// This test has distinct subscriptions on each node and it is making sure the Mirror Routing is working accurately
@Test
public void testMultiNodeSubscription() throws Exception {
final String protocol = "AMQP";
String subscriptionName = "my-topic-shared-subscription";
ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
Topic topic;
try (Connection conn = cfA1.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
for (int i = 0; i < 10; i++) {
session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
}
}
try (Connection conn = cfA2.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
for (int i = 10; i < 20; i++) {
session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
}
}
Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
try (Connection conn = cfA2.createConnection()) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
producer.send(session.createTextMessage("hello"));
}
for (int i = 0; i < 10; i++) {
String name = "my-topic-shared-subscription_" + i + ":global";
Wait.waitFor(() -> a1.locateQueue(name) != null);
Wait.waitFor(() -> b1.locateQueue(name) != null);
Wait.assertEquals(1, a1.locateQueue(name)::getMessageCount);
Wait.assertEquals(1, b1.locateQueue(name)::getMessageCount);
}
for (int i = 10; i < 20; i++) {
String name = "my-topic-shared-subscription_" + i + ":global";
Wait.waitFor(() -> a2.locateQueue(name) != null);
Wait.waitFor(() -> b2.locateQueue(name) != null);
Wait.assertEquals(1, a2.locateQueue(name)::getMessageCount);
Wait.assertEquals(1, b2.locateQueue(name)::getMessageCount);
}
}
private void assertEmptyQueue(Queue queue) {
Assert.assertNotNull(queue);
try {
Wait.assertEquals(0, queue::getMessageCount);
} catch (Throwable e) {
if (e instanceof AssertionError) {
throw (AssertionError) e;
} else {
throw new RuntimeException(e.getMessage(), e);
}
}
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
}

View File

@ -46,6 +46,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.BiConsumer;
/**
* This test is replicating the behaviour from https://issues.jboss.org/browse/HORNETQ-988.
@ -464,6 +465,21 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
}
@Override
public Binding getBinding(String name) {
return null;
}
@Override
public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
bindings.forEach(bindingConsumer);
}
@Override
public int size() {
return bindings.size();
}
@Override
public MessageLoadBalancingType getMessageLoadBalancingType() {
return null;