ARTEMIS-3753 Prevent sending message to internal queues on mirror

In cluster configuration messages could be routed to internal queues for
further delivering on different broker. We need to check that before
sending to SNF, otherwise message can stuck on target server and will
never receive ACK.

co-author: Clebert Suconic

Discusssions on https://github.com/apache/activemq-artemis/pull/4012 and https://github.com/apache/activemq-artemis/pull/4038
This commit is contained in:
iliya 2022-04-05 00:21:37 +03:00 committed by Clebert Suconic
parent c6bfe34f9e
commit 99302b1935
7 changed files with 1176 additions and 6 deletions

View File

@ -201,6 +201,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return;
}
if (context.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid sending to internal queue");
}
return;
}
if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -459,7 +460,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
routingContext.clear().setMirrorSource(this);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();

View File

@ -325,6 +325,14 @@ public final class BindingsImpl implements Bindings {
}
}
private MessageLoadBalancingType getMessageLoadBalancingType(RoutingContext context) {
if (context.getLoadBalancingType() != null) {
return context.getLoadBalancingType();
} else {
return this.messageLoadBalancingType;
}
}
private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
@ -333,7 +341,7 @@ public final class BindingsImpl implements Bindings {
}
routingNameBindingMap.forEachBindings((bindings, nextPosition) -> {
final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
final Binding nextBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
@ -362,7 +370,8 @@ public final class BindingsImpl implements Bindings {
*/
private Binding getNextBinding(final Message message,
final Binding[] bindings,
final CopyOnWriteBindings.BindingIndex bindingIndex) {
final CopyOnWriteBindings.BindingIndex bindingIndex,
final MessageLoadBalancingType loadBalancingType) {
int nextPosition = bindingIndex.getIndex();
final int bindingsCount = bindings.length;
@ -373,8 +382,6 @@ public final class BindingsImpl implements Bindings {
Binding nextBinding = null;
int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
@ -438,7 +445,7 @@ public final class BindingsImpl implements Bindings {
if (resp == null) {
// ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, bindings, nextPosition);
Binding theBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (theBinding == null) {
return;
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -41,6 +42,9 @@ public interface RoutingContext {
* to avoid*/
boolean isMirrorController();
/** return true if every queue routed is internal */
boolean isInternal();
MirrorController getMirrorSource();
RoutingContext setMirrorSource(MirrorController mirrorController);
@ -95,5 +99,9 @@ public interface RoutingContext {
RoutingContext setDuplicateDetection(boolean value);
RoutingContext setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);
MessageLoadBalancingType getLoadBalancingType();
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@ -52,6 +53,9 @@ public class RoutingContextImpl implements RoutingContext {
private RoutingType previousRoutingType;
// if we wanted to bypass the load balancing configured elsewhere
private MessageLoadBalancingType loadBalancingType;
/* To be set by the Mirror target on the server, to avoid ping pongs or reflections of messages between mirrors */
private MirrorController mirrorControllerSource;
@ -59,6 +63,8 @@ public class RoutingContextImpl implements RoutingContext {
Boolean reusable = null;
Boolean internalOnly = null;
volatile int version;
private final Executor executor;
@ -95,6 +101,11 @@ public class RoutingContextImpl implements RoutingContext {
return reusable != null && reusable;
}
@Override
public boolean isInternal() {
return internalOnly != null && internalOnly;
}
@Override
public int getPreviousBindingsVersion() {
return version;
@ -138,6 +149,8 @@ public class RoutingContextImpl implements RoutingContext {
this.reusable = null;
this.internalOnly = null;
return this;
}
@ -163,6 +176,13 @@ public class RoutingContextImpl implements RoutingContext {
listing.getNonDurableQueues().add(queue);
}
if (internalOnly == null) {
internalOnly = true;
}
// every queue added has to be internal only
internalOnly = internalOnly && queue.isInternalQueue();
queueCount++;
}
@ -198,6 +218,16 @@ public class RoutingContextImpl implements RoutingContext {
}
}
@Override
public RoutingContextImpl setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.loadBalancingType = messageLoadBalancingType;
return this;
}
@Override
public MessageLoadBalancingType getLoadBalancingType() {
return loadBalancingType;
}
@Override
public void addQueueWithAck(SimpleString address, Queue queue) {

View File

@ -0,0 +1,945 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
import org.junit.Assert;
import org.junit.Test;
public class RoutingContextTest {
private static class FakeQueueForRoutingContextTest implements Queue {
final String name;
final boolean isInternal;
final boolean durable;
FakeQueueForRoutingContextTest(String name, boolean isInternal, boolean durable) {
this.name = name;
this.isInternal = isInternal;
this.durable = durable;
}
@Override
public CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
@Override
public CriticalCloseable measureCritical(int path) {
return null;
}
@Override
public boolean checkExpiration(long timeout, boolean reset) {
return false;
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
}
@Override
public void routeWithAck(Message message, RoutingContext context) throws Exception {
}
@Override
public SimpleString getName() {
return SimpleString.toSimpleString(name);
}
@Override
public Long getID() {
return null;
}
@Override
public Filter getFilter() {
return null;
}
@Override
public void setFilter(Filter filter) {
}
@Override
public PagingStore getPagingStore() {
return null;
}
@Override
public PageSubscription getPageSubscription() {
return null;
}
@Override
public RoutingType getRoutingType() {
return null;
}
@Override
public void setRoutingType(RoutingType routingType) {
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
@Override
public boolean isDurable() {
return durable;
}
@Override
public int durableUp(Message message) {
return 0;
}
@Override
public int durableDown(Message message) {
return 0;
}
@Override
public void refUp(MessageReference messageReference) {
}
@Override
public void refDown(MessageReference messageReference) {
}
@Override
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
return null;
}
@Override
public boolean isDurableMessage() {
return false;
}
@Override
public boolean isAutoDelete() {
return false;
}
@Override
public long getAutoDeleteDelay() {
return 0;
}
@Override
public long getAutoDeleteMessageCount() {
return 0;
}
@Override
public boolean isTemporary() {
return false;
}
@Override
public boolean isAutoCreated() {
return false;
}
@Override
public boolean isPurgeOnNoConsumers() {
return false;
}
@Override
public void setPurgeOnNoConsumers(boolean value) {
}
@Override
public boolean isEnabled() {
return false;
}
@Override
public void setEnabled(boolean value) {
}
@Override
public int getConsumersBeforeDispatch() {
return 0;
}
@Override
public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
}
@Override
public long getDelayBeforeDispatch() {
return 0;
}
@Override
public void setDelayBeforeDispatch(long delayBeforeDispatch) {
}
@Override
public long getDispatchStartTime() {
return 0;
}
@Override
public boolean isDispatching() {
return false;
}
@Override
public void setDispatching(boolean dispatching) {
}
@Override
public boolean isExclusive() {
return false;
}
@Override
public void setExclusive(boolean value) {
}
@Override
public boolean isLastValue() {
return false;
}
@Override
public SimpleString getLastValueKey() {
return null;
}
@Override
public boolean isNonDestructive() {
return false;
}
@Override
public void setNonDestructive(boolean nonDestructive) {
}
@Override
public int getMaxConsumers() {
return 0;
}
@Override
public void setMaxConsumer(int maxConsumers) {
}
@Override
public int getGroupBuckets() {
return 0;
}
@Override
public void setGroupBuckets(int groupBuckets) {
}
@Override
public boolean isGroupRebalance() {
return false;
}
@Override
public void setGroupRebalance(boolean groupRebalance) {
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return false;
}
@Override
public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
}
@Override
public SimpleString getGroupFirstKey() {
return null;
}
@Override
public void setGroupFirstKey(SimpleString groupFirstKey) {
}
@Override
public boolean isConfigurationManaged() {
return false;
}
@Override
public void setConfigurationManaged(boolean configurationManaged) {
}
@Override
public void addConsumer(Consumer consumer) throws Exception {
}
@Override
public void addLingerSession(String sessionId) {
}
@Override
public void removeLingerSession(String sessionId) {
}
@Override
public void removeConsumer(Consumer consumer) {
}
@Override
public int getConsumerCount() {
return 0;
}
@Override
public long getConsumerRemovedTimestamp() {
return 0;
}
@Override
public void setRingSize(long ringSize) {
}
@Override
public long getRingSize() {
return 0;
}
@Override
public ReferenceCounter getConsumersRefCount() {
return null;
}
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
}
@Override
public void reload(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref, boolean direct) {
}
@Override
public void addHead(MessageReference ref, boolean scheduling) {
}
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
}
@Override
public void addHead(List<MessageReference> refs, boolean scheduling) {
}
@Override
public void acknowledge(MessageReference ref) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void acknowledge(Transaction tx,
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
}
@Override
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void cancel(Transaction tx, MessageReference ref) {
}
@Override
public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {
}
@Override
public void cancel(MessageReference reference, long timeBase) throws Exception {
}
@Override
public void deliverAsync() {
}
@Override
public void unproposed(SimpleString groupID) {
}
@Override
public void forceDelivery() {
}
@Override
public void deleteQueue() throws Exception {
}
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
}
@Override
public void removeAddress() throws Exception {
}
@Override
public void destroyPaging() throws Exception {
}
@Override
public long getMessageCount() {
return 0;
}
@Override
public long getPersistentSize() {
return 0;
}
@Override
public long getDurableMessageCount() {
return 0;
}
@Override
public long getDurablePersistentSize() {
return 0;
}
@Override
public int getDeliveringCount() {
return 0;
}
@Override
public long getDeliveringSize() {
return 0;
}
@Override
public int getDurableDeliveringCount() {
return 0;
}
@Override
public long getDurableDeliveringSize() {
return 0;
}
@Override
public void referenceHandled(MessageReference ref) {
}
@Override
public int getScheduledCount() {
return 0;
}
@Override
public long getScheduledSize() {
return 0;
}
@Override
public int getDurableScheduledCount() {
return 0;
}
@Override
public long getDurableScheduledSize() {
return 0;
}
@Override
public List<MessageReference> getScheduledMessages() {
return null;
}
@Override
public Map<String, List<MessageReference>> getDeliveringMessages() {
return null;
}
@Override
public long getMessagesAdded() {
return 0;
}
@Override
public long getAcknowledgeAttempts() {
return 0;
}
@Override
public long getMessagesAcknowledged() {
return 0;
}
@Override
public long getMessagesExpired() {
return 0;
}
@Override
public long getMessagesKilled() {
return 0;
}
@Override
public long getMessagesReplaced() {
return 0;
}
@Override
public MessageReference removeReferenceWithID(long id) throws Exception {
return null;
}
@Override
public MessageReference getReference(long id) throws ActiveMQException {
return null;
}
@Override
public int deleteAllReferences() throws Exception {
return 0;
}
@Override
public int deleteAllReferences(int flushLimit) throws Exception {
return 0;
}
@Override
public boolean deleteReference(long messageID) throws Exception {
return false;
}
@Override
public int deleteMatchingReferences(Filter filter) throws Exception {
return 0;
}
@Override
public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception {
return 0;
}
@Override
public boolean expireReference(long messageID) throws Exception {
return false;
}
@Override
public int expireReferences(Filter filter) throws Exception {
return 0;
}
@Override
public void expireReferences(Runnable done) {
}
@Override
public void expire(MessageReference ref) throws Exception {
}
@Override
public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
return false;
}
@Override
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
return 0;
}
@Override
public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
}
@Override
public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception {
return false;
}
@Override
public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception {
return 0;
}
@Override
public boolean moveReference(long messageID,
SimpleString toAddress,
Binding binding,
boolean rejectDuplicates) throws Exception {
return false;
}
@Override
public int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception {
return 0;
}
@Override
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates,
Binding binding) throws Exception {
return 0;
}
@Override
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates,
int messageCount,
Binding binding) throws Exception {
return 0;
}
@Override
public int retryMessages(Filter filter) throws Exception {
return 0;
}
@Override
public void addRedistributor(long delay) {
}
@Override
public void cancelRedistributor() {
}
@Override
public boolean hasMatchingConsumer(Message message) {
return false;
}
@Override
public Collection<Consumer> getConsumers() {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
long timeBase,
boolean ignoreRedeliveryDelay) throws Exception {
return null;
}
@Override
public LinkedListIterator<MessageReference> iterator() {
return null;
}
@Override
public LinkedListIterator<MessageReference> browserIterator() {
return null;
}
@Override
public SimpleString getExpiryAddress() {
return null;
}
@Override
public SimpleString getDeadLetterAddress() {
return null;
}
@Override
public void pause() {
}
@Override
public void pause(boolean persist) {
}
@Override
public void reloadPause(long recordID) {
}
@Override
public void resume() {
}
@Override
public boolean isPaused() {
return false;
}
@Override
public boolean isPersistedPause() {
return false;
}
@Override
public Executor getExecutor() {
return null;
}
@Override
public void resetAllIterators() {
}
@Override
public boolean flushExecutor() {
return false;
}
@Override
public void close() throws Exception {
}
@Override
public boolean isDirectDeliver() {
return false;
}
@Override
public SimpleString getAddress() {
return SimpleString.toSimpleString(name);
}
@Override
public boolean isInternalQueue() {
return isInternal;
}
@Override
public void setInternalQueue(boolean internalQueue) {
}
@Override
public void resetMessagesAdded() {
}
@Override
public void resetMessagesAcknowledged() {
}
@Override
public void resetMessagesExpired() {
}
@Override
public void resetMessagesKilled() {
}
@Override
public void incrementMesssagesAdded() {
}
@Override
public void deliverScheduledMessages() throws ActiveMQException {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
}
@Override
public SimpleString getUser() {
return null;
}
@Override
public void setUser(SimpleString user) {
}
@Override
public void recheckRefCount(OperationContext context) {
}
}
@Test
public void testValidateInternal() {
RoutingContext context = new RoutingContextImpl(new TransactionImpl(new NullStorageManager()));
Assert.assertFalse(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
Assert.assertTrue(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t2"), new FakeQueueForRoutingContextTest("t2", false, true));
Assert.assertFalse(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
Assert.assertFalse(context.isInternal());
context.clear();
Assert.assertFalse(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
Assert.assertTrue(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t2"), new FakeQueueForRoutingContextTest("t2", true, true));
Assert.assertTrue(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
Assert.assertTrue(context.isInternal());
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Test;
import java.util.Collections;
public class AMQPClusterReplicaTest extends AmqpClientTestSupport {
protected static final int NODE_1_PORT = 5673;
protected static final int NODE_2_PORT = 5674;
@Test
public void testReplicaWithCluster() throws Exception {
ActiveMQServer node_1 = createNode1(MessageLoadBalancingType.ON_DEMAND);
ActiveMQServer node_2 = createNode2(MessageLoadBalancingType.ON_DEMAND);
server.start();
// Set node_1 mirror to target
node_1.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)));
node_1.start();
node_2.start();
configureAddressAndQueue(node_1);
configureAddressAndQueue(node_2);
waitForTopology(node_1, 2);
waitForTopology(node_2, 2);
{
// sender
ClientSessionFactory sessionFactory = addSessionFactory(getNode1ServerLocator().createSessionFactory());
ClientSession session = addClientSession(sessionFactory.createSession());
sendMessages(session, addClientProducer(session.createProducer("test")), 10);
}
{
// receiver
ClientSessionFactory sessionFactory = addSessionFactory(getNode2ServerLocator().createSessionFactory());
ClientSession session = addClientSession(sessionFactory.createSession());
session.start();
receiveMessages(addClientConsumer(session.createConsumer("test")), 0, 10, true);
}
// Wait to mirror target to read all messages
Wait.waitFor(() -> node_1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror").getMessageCount() == 0);
// Expect no messages in mirrored test queue
Wait.assertEquals(0, () -> server.locateQueue("test").getMessageCount());
}
@Test
public void testReplicaWithClusterTargetStrict() throws Exception {
ActiveMQServer node_1 = createNode1(MessageLoadBalancingType.STRICT);
ActiveMQServer node_2 = createNode2(MessageLoadBalancingType.STRICT);
server.stop();
// Set node_1 mirror to target
server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("mirror1", "tcp://localhost:" + NODE_1_PORT).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)));
server.start();
node_1.start();
node_2.start();
configureAddressAndQueue(node_1);
configureAddressAndQueue(server);
configureAddressAndQueue(node_2);
waitForTopology(node_1, 2);
waitForTopology(node_2, 2);
{
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("hello"));
}
org.apache.activemq.artemis.core.server.Queue mainServerQueue = server.locateQueue("test");
org.apache.activemq.artemis.core.server.Queue node1Queue = node_1.locateQueue("test");
org.apache.activemq.artemis.core.server.Queue node2Queue = node_2.locateQueue("test");
Wait.assertEquals(10L, node1Queue::getMessageCount, 5000, 10);
}
}
private ServerLocator getNode1ServerLocator() throws Exception {
return addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:" + NODE_1_PORT));
}
private ServerLocator getNode2ServerLocator() throws Exception {
return addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:" + NODE_2_PORT));
}
private ActiveMQServer createNode1(MessageLoadBalancingType loadBalancingType) throws Exception {
ActiveMQServer node_1 = createServer(NODE_1_PORT, false);
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("node1").setMessageLoadBalancingType(loadBalancingType).setStaticConnectors(Collections.singletonList("node2"));
node_1.setIdentity("node_1");
node_1.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("node1", "tcp://localhost:" + NODE_1_PORT).addConnectorConfiguration("node2", "tcp://localhost:" + NODE_2_PORT).addClusterConfiguration(clusterConfiguration);
return node_1;
}
private ActiveMQServer createNode2(MessageLoadBalancingType loadBalancingType) throws Exception {
ActiveMQServer node_2 = createServer(NODE_2_PORT, false);
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("node2").setMessageLoadBalancingType(loadBalancingType).setStaticConnectors(Collections.singletonList("node1"));
node_2.setIdentity("node_2");
node_2.getConfiguration().setName("node_2").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("node1", "tcp://localhost:" + NODE_1_PORT).addConnectorConfiguration("node2", "tcp://localhost:" + NODE_2_PORT).addClusterConfiguration(clusterConfiguration);
return node_2;
}
private void configureAddressAndQueue(ActiveMQServer node) throws Exception {
node.addAddressInfo(new AddressInfo("test").setAutoCreated(false));
node.getAddressSettingsRepository().addMatch("test", new AddressSettings().setRedistributionDelay(0));
node.createQueue(new QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST).setDurable(true));
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
}