This commit is contained in:
Clebert Suconic 2022-04-23 10:15:00 -04:00
commit a38fae1fbd
7 changed files with 1176 additions and 6 deletions

View File

@ -201,6 +201,13 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return;
}
if (context.isInternal()) {
if (logger.isTraceEnabled()) {
logger.trace("server " + server + " is discarding send to avoid sending to internal queue");
}
return;
}
if (logger.isTraceEnabled()) {
logger.trace(server + " send message " + message);
}

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -459,7 +460,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
routingContext.clear().setMirrorSource(this);
routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
server.getPostOffice().route(message, routingContext, false);
// We use this as part of a transaction because of the duplicate detection cache that needs to be done atomically
transaction.commit();

View File

@ -325,6 +325,14 @@ public final class BindingsImpl implements Bindings {
}
}
private MessageLoadBalancingType getMessageLoadBalancingType(RoutingContext context) {
if (context.getLoadBalancingType() != null) {
return context.getLoadBalancingType();
} else {
return this.messageLoadBalancingType;
}
}
private void simpleRouting(final Message message,
final RoutingContext context,
final int currentVersion) throws Exception {
@ -333,7 +341,7 @@ public final class BindingsImpl implements Bindings {
}
routingNameBindingMap.forEachBindings((bindings, nextPosition) -> {
final Binding nextBinding = getNextBinding(message, bindings, nextPosition);
final Binding nextBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (nextBinding != null && nextBinding.getFilter() == null && nextBinding.isLocal() && bindings.length == 1) {
context.setReusable(true, currentVersion);
} else {
@ -362,7 +370,8 @@ public final class BindingsImpl implements Bindings {
*/
private Binding getNextBinding(final Message message,
final Binding[] bindings,
final CopyOnWriteBindings.BindingIndex bindingIndex) {
final CopyOnWriteBindings.BindingIndex bindingIndex,
final MessageLoadBalancingType loadBalancingType) {
int nextPosition = bindingIndex.getIndex();
final int bindingsCount = bindings.length;
@ -373,8 +382,6 @@ public final class BindingsImpl implements Bindings {
Binding nextBinding = null;
int lastLowPriorityBinding = -1;
// snapshot this, to save loading it on each iteration
final MessageLoadBalancingType loadBalancingType = this.messageLoadBalancingType;
for (int i = 0; i < bindingsCount; i++) {
final Binding binding = bindings[nextPosition];
@ -438,7 +445,7 @@ public final class BindingsImpl implements Bindings {
if (resp == null) {
// ok let's find the next binding to propose
Binding theBinding = getNextBinding(message, bindings, nextPosition);
Binding theBinding = getNextBinding(message, bindings, nextPosition, getMessageLoadBalancingType(context));
if (theBinding == null) {
return;
}

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
@ -41,6 +42,9 @@ public interface RoutingContext {
* to avoid*/
boolean isMirrorController();
/** return true if every queue routed is internal */
boolean isInternal();
MirrorController getMirrorSource();
RoutingContext setMirrorSource(MirrorController mirrorController);
@ -95,5 +99,9 @@ public interface RoutingContext {
RoutingContext setDuplicateDetection(boolean value);
RoutingContext setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType);
MessageLoadBalancingType getLoadBalancingType();
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@ -52,6 +53,9 @@ public class RoutingContextImpl implements RoutingContext {
private RoutingType previousRoutingType;
// if we wanted to bypass the load balancing configured elsewhere
private MessageLoadBalancingType loadBalancingType;
/* To be set by the Mirror target on the server, to avoid ping pongs or reflections of messages between mirrors */
private MirrorController mirrorControllerSource;
@ -59,6 +63,8 @@ public class RoutingContextImpl implements RoutingContext {
Boolean reusable = null;
Boolean internalOnly = null;
volatile int version;
private final Executor executor;
@ -95,6 +101,11 @@ public class RoutingContextImpl implements RoutingContext {
return reusable != null && reusable;
}
@Override
public boolean isInternal() {
return internalOnly != null && internalOnly;
}
@Override
public int getPreviousBindingsVersion() {
return version;
@ -138,6 +149,8 @@ public class RoutingContextImpl implements RoutingContext {
this.reusable = null;
this.internalOnly = null;
return this;
}
@ -163,6 +176,13 @@ public class RoutingContextImpl implements RoutingContext {
listing.getNonDurableQueues().add(queue);
}
if (internalOnly == null) {
internalOnly = true;
}
// every queue added has to be internal only
internalOnly = internalOnly && queue.isInternalQueue();
queueCount++;
}
@ -198,6 +218,16 @@ public class RoutingContextImpl implements RoutingContext {
}
}
@Override
public RoutingContextImpl setLoadBalancingType(MessageLoadBalancingType messageLoadBalancingType) {
this.loadBalancingType = messageLoadBalancingType;
return this;
}
@Override
public MessageLoadBalancingType getLoadBalancingType() {
return loadBalancingType;
}
@Override
public void addQueueWithAck(SimpleString address, Queue queue) {

View File

@ -0,0 +1,945 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
import org.junit.Assert;
import org.junit.Test;
public class RoutingContextTest {
private static class FakeQueueForRoutingContextTest implements Queue {
final String name;
final boolean isInternal;
final boolean durable;
FakeQueueForRoutingContextTest(String name, boolean isInternal, boolean durable) {
this.name = name;
this.isInternal = isInternal;
this.durable = durable;
}
@Override
public CriticalAnalyzer getCriticalAnalyzer() {
return null;
}
@Override
public CriticalCloseable measureCritical(int path) {
return null;
}
@Override
public boolean checkExpiration(long timeout, boolean reset) {
return false;
}
@Override
public void route(Message message, RoutingContext context) throws Exception {
}
@Override
public void routeWithAck(Message message, RoutingContext context) throws Exception {
}
@Override
public SimpleString getName() {
return SimpleString.toSimpleString(name);
}
@Override
public Long getID() {
return null;
}
@Override
public Filter getFilter() {
return null;
}
@Override
public void setFilter(Filter filter) {
}
@Override
public PagingStore getPagingStore() {
return null;
}
@Override
public PageSubscription getPageSubscription() {
return null;
}
@Override
public RoutingType getRoutingType() {
return null;
}
@Override
public void setRoutingType(RoutingType routingType) {
}
@Override
public boolean allowsReferenceCallback() {
return false;
}
@Override
public boolean isDurable() {
return durable;
}
@Override
public int durableUp(Message message) {
return 0;
}
@Override
public int durableDown(Message message) {
return 0;
}
@Override
public void refUp(MessageReference messageReference) {
}
@Override
public void refDown(MessageReference messageReference) {
}
@Override
public MessageReference removeWithSuppliedID(String serverID, long id, NodeStore<MessageReference> nodeStore) {
return null;
}
@Override
public boolean isDurableMessage() {
return false;
}
@Override
public boolean isAutoDelete() {
return false;
}
@Override
public long getAutoDeleteDelay() {
return 0;
}
@Override
public long getAutoDeleteMessageCount() {
return 0;
}
@Override
public boolean isTemporary() {
return false;
}
@Override
public boolean isAutoCreated() {
return false;
}
@Override
public boolean isPurgeOnNoConsumers() {
return false;
}
@Override
public void setPurgeOnNoConsumers(boolean value) {
}
@Override
public boolean isEnabled() {
return false;
}
@Override
public void setEnabled(boolean value) {
}
@Override
public int getConsumersBeforeDispatch() {
return 0;
}
@Override
public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
}
@Override
public long getDelayBeforeDispatch() {
return 0;
}
@Override
public void setDelayBeforeDispatch(long delayBeforeDispatch) {
}
@Override
public long getDispatchStartTime() {
return 0;
}
@Override
public boolean isDispatching() {
return false;
}
@Override
public void setDispatching(boolean dispatching) {
}
@Override
public boolean isExclusive() {
return false;
}
@Override
public void setExclusive(boolean value) {
}
@Override
public boolean isLastValue() {
return false;
}
@Override
public SimpleString getLastValueKey() {
return null;
}
@Override
public boolean isNonDestructive() {
return false;
}
@Override
public void setNonDestructive(boolean nonDestructive) {
}
@Override
public int getMaxConsumers() {
return 0;
}
@Override
public void setMaxConsumer(int maxConsumers) {
}
@Override
public int getGroupBuckets() {
return 0;
}
@Override
public void setGroupBuckets(int groupBuckets) {
}
@Override
public boolean isGroupRebalance() {
return false;
}
@Override
public void setGroupRebalance(boolean groupRebalance) {
}
@Override
public boolean isGroupRebalancePauseDispatch() {
return false;
}
@Override
public void setGroupRebalancePauseDispatch(boolean groupRebalancePauseDisptach) {
}
@Override
public SimpleString getGroupFirstKey() {
return null;
}
@Override
public void setGroupFirstKey(SimpleString groupFirstKey) {
}
@Override
public boolean isConfigurationManaged() {
return false;
}
@Override
public void setConfigurationManaged(boolean configurationManaged) {
}
@Override
public void addConsumer(Consumer consumer) throws Exception {
}
@Override
public void addLingerSession(String sessionId) {
}
@Override
public void removeLingerSession(String sessionId) {
}
@Override
public void removeConsumer(Consumer consumer) {
}
@Override
public int getConsumerCount() {
return 0;
}
@Override
public long getConsumerRemovedTimestamp() {
return 0;
}
@Override
public void setRingSize(long ringSize) {
}
@Override
public long getRingSize() {
return 0;
}
@Override
public ReferenceCounter getConsumersRefCount() {
return null;
}
@Override
public void addSorted(List<MessageReference> refs, boolean scheduling) {
}
@Override
public void reload(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref, boolean direct) {
}
@Override
public void addHead(MessageReference ref, boolean scheduling) {
}
@Override
public void addSorted(MessageReference ref, boolean scheduling) {
}
@Override
public void addHead(List<MessageReference> refs, boolean scheduling) {
}
@Override
public void acknowledge(MessageReference ref) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void acknowledge(Transaction tx,
MessageReference ref,
AckReason reason,
ServerConsumer consumer) throws Exception {
}
@Override
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void cancel(Transaction tx, MessageReference ref) {
}
@Override
public void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck) {
}
@Override
public void cancel(MessageReference reference, long timeBase) throws Exception {
}
@Override
public void deliverAsync() {
}
@Override
public void unproposed(SimpleString groupID) {
}
@Override
public void forceDelivery() {
}
@Override
public void deleteQueue() throws Exception {
}
@Override
public void deleteQueue(boolean removeConsumers) throws Exception {
}
@Override
public void removeAddress() throws Exception {
}
@Override
public void destroyPaging() throws Exception {
}
@Override
public long getMessageCount() {
return 0;
}
@Override
public long getPersistentSize() {
return 0;
}
@Override
public long getDurableMessageCount() {
return 0;
}
@Override
public long getDurablePersistentSize() {
return 0;
}
@Override
public int getDeliveringCount() {
return 0;
}
@Override
public long getDeliveringSize() {
return 0;
}
@Override
public int getDurableDeliveringCount() {
return 0;
}
@Override
public long getDurableDeliveringSize() {
return 0;
}
@Override
public void referenceHandled(MessageReference ref) {
}
@Override
public int getScheduledCount() {
return 0;
}
@Override
public long getScheduledSize() {
return 0;
}
@Override
public int getDurableScheduledCount() {
return 0;
}
@Override
public long getDurableScheduledSize() {
return 0;
}
@Override
public List<MessageReference> getScheduledMessages() {
return null;
}
@Override
public Map<String, List<MessageReference>> getDeliveringMessages() {
return null;
}
@Override
public long getMessagesAdded() {
return 0;
}
@Override
public long getAcknowledgeAttempts() {
return 0;
}
@Override
public long getMessagesAcknowledged() {
return 0;
}
@Override
public long getMessagesExpired() {
return 0;
}
@Override
public long getMessagesKilled() {
return 0;
}
@Override
public long getMessagesReplaced() {
return 0;
}
@Override
public MessageReference removeReferenceWithID(long id) throws Exception {
return null;
}
@Override
public MessageReference getReference(long id) throws ActiveMQException {
return null;
}
@Override
public int deleteAllReferences() throws Exception {
return 0;
}
@Override
public int deleteAllReferences(int flushLimit) throws Exception {
return 0;
}
@Override
public boolean deleteReference(long messageID) throws Exception {
return false;
}
@Override
public int deleteMatchingReferences(Filter filter) throws Exception {
return 0;
}
@Override
public int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception {
return 0;
}
@Override
public boolean expireReference(long messageID) throws Exception {
return false;
}
@Override
public int expireReferences(Filter filter) throws Exception {
return 0;
}
@Override
public void expireReferences(Runnable done) {
}
@Override
public void expire(MessageReference ref) throws Exception {
}
@Override
public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
}
@Override
public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception {
return false;
}
@Override
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception {
return 0;
}
@Override
public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
return false;
}
@Override
public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception {
return false;
}
@Override
public int changeReferencesPriority(Filter filter, byte newPriority) throws Exception {
return 0;
}
@Override
public boolean moveReference(long messageID,
SimpleString toAddress,
Binding binding,
boolean rejectDuplicates) throws Exception {
return false;
}
@Override
public int moveReferences(Filter filter, SimpleString toAddress, Binding binding) throws Exception {
return 0;
}
@Override
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates,
Binding binding) throws Exception {
return 0;
}
@Override
public int moveReferences(int flushLimit,
Filter filter,
SimpleString toAddress,
boolean rejectDuplicates,
int messageCount,
Binding binding) throws Exception {
return 0;
}
@Override
public int retryMessages(Filter filter) throws Exception {
return 0;
}
@Override
public void addRedistributor(long delay) {
}
@Override
public void cancelRedistributor() {
}
@Override
public boolean hasMatchingConsumer(Message message) {
return false;
}
@Override
public Collection<Consumer> getConsumers() {
return null;
}
@Override
public Map<SimpleString, Consumer> getGroups() {
return null;
}
@Override
public void resetGroup(SimpleString groupID) {
}
@Override
public void resetAllGroups() {
}
@Override
public int getGroupCount() {
return 0;
}
@Override
public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
long timeBase,
boolean ignoreRedeliveryDelay) throws Exception {
return null;
}
@Override
public LinkedListIterator<MessageReference> iterator() {
return null;
}
@Override
public LinkedListIterator<MessageReference> browserIterator() {
return null;
}
@Override
public SimpleString getExpiryAddress() {
return null;
}
@Override
public SimpleString getDeadLetterAddress() {
return null;
}
@Override
public void pause() {
}
@Override
public void pause(boolean persist) {
}
@Override
public void reloadPause(long recordID) {
}
@Override
public void resume() {
}
@Override
public boolean isPaused() {
return false;
}
@Override
public boolean isPersistedPause() {
return false;
}
@Override
public Executor getExecutor() {
return null;
}
@Override
public void resetAllIterators() {
}
@Override
public boolean flushExecutor() {
return false;
}
@Override
public void close() throws Exception {
}
@Override
public boolean isDirectDeliver() {
return false;
}
@Override
public SimpleString getAddress() {
return SimpleString.toSimpleString(name);
}
@Override
public boolean isInternalQueue() {
return isInternal;
}
@Override
public void setInternalQueue(boolean internalQueue) {
}
@Override
public void resetMessagesAdded() {
}
@Override
public void resetMessagesAcknowledged() {
}
@Override
public void resetMessagesExpired() {
}
@Override
public void resetMessagesKilled() {
}
@Override
public void incrementMesssagesAdded() {
}
@Override
public void deliverScheduledMessages() throws ActiveMQException {
}
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {
}
@Override
public SimpleString getUser() {
return null;
}
@Override
public void setUser(SimpleString user) {
}
@Override
public void recheckRefCount(OperationContext context) {
}
}
@Test
public void testValidateInternal() {
RoutingContext context = new RoutingContextImpl(new TransactionImpl(new NullStorageManager()));
Assert.assertFalse(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
Assert.assertTrue(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t2"), new FakeQueueForRoutingContextTest("t2", false, true));
Assert.assertFalse(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
Assert.assertFalse(context.isInternal());
context.clear();
Assert.assertFalse(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t1"), new FakeQueueForRoutingContextTest("t1", true, true));
Assert.assertTrue(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t2"), new FakeQueueForRoutingContextTest("t2", true, true));
Assert.assertTrue(context.isInternal());
context.addQueue(SimpleString.toSimpleString("t3"), new FakeQueueForRoutingContextTest("t3", true, true));
Assert.assertTrue(context.isInternal());
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Test;
import java.util.Collections;
public class AMQPClusterReplicaTest extends AmqpClientTestSupport {
protected static final int NODE_1_PORT = 5673;
protected static final int NODE_2_PORT = 5674;
@Test
public void testReplicaWithCluster() throws Exception {
ActiveMQServer node_1 = createNode1(MessageLoadBalancingType.ON_DEMAND);
ActiveMQServer node_2 = createNode2(MessageLoadBalancingType.ON_DEMAND);
server.start();
// Set node_1 mirror to target
node_1.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)));
node_1.start();
node_2.start();
configureAddressAndQueue(node_1);
configureAddressAndQueue(node_2);
waitForTopology(node_1, 2);
waitForTopology(node_2, 2);
{
// sender
ClientSessionFactory sessionFactory = addSessionFactory(getNode1ServerLocator().createSessionFactory());
ClientSession session = addClientSession(sessionFactory.createSession());
sendMessages(session, addClientProducer(session.createProducer("test")), 10);
}
{
// receiver
ClientSessionFactory sessionFactory = addSessionFactory(getNode2ServerLocator().createSessionFactory());
ClientSession session = addClientSession(sessionFactory.createSession());
session.start();
receiveMessages(addClientConsumer(session.createConsumer("test")), 0, 10, true);
}
// Wait to mirror target to read all messages
Wait.waitFor(() -> node_1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_mirror").getMessageCount() == 0);
// Expect no messages in mirrored test queue
Wait.assertEquals(0, () -> server.locateQueue("test").getMessageCount());
}
@Test
public void testReplicaWithClusterTargetStrict() throws Exception {
ActiveMQServer node_1 = createNode1(MessageLoadBalancingType.STRICT);
ActiveMQServer node_2 = createNode2(MessageLoadBalancingType.STRICT);
server.stop();
// Set node_1 mirror to target
server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("mirror1", "tcp://localhost:" + NODE_1_PORT).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true)));
server.start();
node_1.start();
node_2.start();
configureAddressAndQueue(node_1);
configureAddressAndQueue(server);
configureAddressAndQueue(node_2);
waitForTopology(node_1, 2);
waitForTopology(node_2, 2);
{
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("hello"));
}
org.apache.activemq.artemis.core.server.Queue mainServerQueue = server.locateQueue("test");
org.apache.activemq.artemis.core.server.Queue node1Queue = node_1.locateQueue("test");
org.apache.activemq.artemis.core.server.Queue node2Queue = node_2.locateQueue("test");
Wait.assertEquals(10L, node1Queue::getMessageCount, 5000, 10);
}
}
private ServerLocator getNode1ServerLocator() throws Exception {
return addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:" + NODE_1_PORT));
}
private ServerLocator getNode2ServerLocator() throws Exception {
return addServerLocator(ActiveMQClient.createServerLocator("tcp://localhost:" + NODE_2_PORT));
}
private ActiveMQServer createNode1(MessageLoadBalancingType loadBalancingType) throws Exception {
ActiveMQServer node_1 = createServer(NODE_1_PORT, false);
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("node1").setMessageLoadBalancingType(loadBalancingType).setStaticConnectors(Collections.singletonList("node2"));
node_1.setIdentity("node_1");
node_1.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("node1", "tcp://localhost:" + NODE_1_PORT).addConnectorConfiguration("node2", "tcp://localhost:" + NODE_2_PORT).addClusterConfiguration(clusterConfiguration);
return node_1;
}
private ActiveMQServer createNode2(MessageLoadBalancingType loadBalancingType) throws Exception {
ActiveMQServer node_2 = createServer(NODE_2_PORT, false);
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("node2").setMessageLoadBalancingType(loadBalancingType).setStaticConnectors(Collections.singletonList("node1"));
node_2.setIdentity("node_2");
node_2.getConfiguration().setName("node_2").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("node1", "tcp://localhost:" + NODE_1_PORT).addConnectorConfiguration("node2", "tcp://localhost:" + NODE_2_PORT).addClusterConfiguration(clusterConfiguration);
return node_2;
}
private void configureAddressAndQueue(ActiveMQServer node) throws Exception {
node.addAddressInfo(new AddressInfo("test").setAutoCreated(false));
node.getAddressSettingsRepository().addMatch("test", new AddressSettings().setRedistributionDelay(0));
node.createQueue(new QueueConfiguration("test").setAddress("test").setRoutingType(RoutingType.ANYCAST).setDurable(true));
}
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
}