ARTEMIS-4247 BrokerClustering vs Mirror code improvements
This commit is contained in:
parent
0eefc38d93
commit
fea84e39f5
|
@ -32,9 +32,9 @@ import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.MirrorOption;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
import org.apache.activemq.artemis.core.server.impl.AckReason;
|
||||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||||
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||||
|
@ -77,11 +77,9 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
||||||
public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
|
public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
|
||||||
public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
|
public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
|
||||||
|
|
||||||
/** When a clustered node (from regular cluster connections) receives a message
|
/* In a Multi-cast address (or JMS Topics) we may in certain cases (clustered-routing for instance)
|
||||||
it will have target queues associated with it
|
select which particular queues will receive the routing output */
|
||||||
this could be from message redistribution or simply load balancing.
|
public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-mr-trg-q");
|
||||||
an that case this will have the queue associated with it */
|
|
||||||
public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");
|
|
||||||
|
|
||||||
// Capabilities
|
// Capabilities
|
||||||
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
|
public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
|
||||||
|
@ -90,7 +88,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
||||||
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
|
public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
|
||||||
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());
|
public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());
|
||||||
|
|
||||||
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));
|
private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null));
|
||||||
|
|
||||||
final Queue snfQueue;
|
final Queue snfQueue;
|
||||||
final ActiveMQServer server;
|
final ActiveMQServer server;
|
||||||
|
@ -614,7 +612,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
|
||||||
public static void route(ActiveMQServer server, Message message) throws Exception {
|
public static void route(ActiveMQServer server, Message message) throws Exception {
|
||||||
message.setMessageID(server.getStorageManager().generateID());
|
message.setMessageID(server.getStorageManager().generateID());
|
||||||
RoutingContext ctx = mirrorControlRouting.get();
|
RoutingContext ctx = mirrorControlRouting.get();
|
||||||
ctx.clear();
|
ctx.clear().setMirrorOption(MirrorOption.disabled);
|
||||||
server.getPostOffice().route(message, ctx, false);
|
server.getPostOffice().route(message, ctx, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,7 +59,7 @@ public interface Bindings extends UnproposalListener {
|
||||||
|
|
||||||
boolean allowRedistribute();
|
boolean allowRedistribute();
|
||||||
|
|
||||||
void forEach(BiConsumer<SimpleString, Binding> bindingConsumer);
|
void forEach(BiConsumer<String, Binding> bindingConsumer);
|
||||||
|
|
||||||
int size();
|
int size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,9 +39,9 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.MirrorOption;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||||
|
@ -69,7 +69,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
* This is the same as bindingsIdMap but indexed on the binding's uniqueName rather than ID. Two maps are
|
* This is the same as bindingsIdMap but indexed on the binding's uniqueName rather than ID. Two maps are
|
||||||
* maintained to speed routing, otherwise we'd have to loop through the bindingsIdMap when routing to an FQQN.
|
* maintained to speed routing, otherwise we'd have to loop through the bindingsIdMap when routing to an FQQN.
|
||||||
*/
|
*/
|
||||||
private final Map<SimpleString, Binding> bindingsNameMap = new ConcurrentHashMap<>();
|
private final Map<String, Binding> bindingsNameMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private final Set<Binding> exclusiveBindings = new CopyOnWriteArraySet<>();
|
private final Set<Binding> exclusiveBindings = new CopyOnWriteArraySet<>();
|
||||||
|
|
||||||
|
@ -123,7 +123,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Binding getBinding(String name) {
|
public Binding getBinding(String name) {
|
||||||
return bindingsNameMap.get(SimpleString.toSimpleString(name));
|
return bindingsNameMap.get(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -138,7 +138,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
}
|
}
|
||||||
|
|
||||||
bindingsIdMap.put(binding.getID(), binding);
|
bindingsIdMap.put(binding.getID(), binding);
|
||||||
bindingsNameMap.put(binding.getUniqueName(), binding);
|
bindingsNameMap.put(String.valueOf(binding.getUniqueName()), binding);
|
||||||
|
|
||||||
if (binding instanceof RemoteQueueBinding) {
|
if (binding instanceof RemoteQueueBinding) {
|
||||||
setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
|
setMessageLoadBalancingType(((RemoteQueueBinding) binding).getMessageLoadBalancingType());
|
||||||
|
@ -162,7 +162,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) {
|
public Binding removeBindingByUniqueName(final SimpleString bindingUniqueName) {
|
||||||
final Binding binding = bindingsNameMap.remove(bindingUniqueName);
|
final Binding binding = bindingsNameMap.remove(String.valueOf(bindingUniqueName));
|
||||||
if (binding == null) {
|
if (binding == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -174,7 +174,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
}
|
}
|
||||||
|
|
||||||
bindingsIdMap.remove(binding.getID());
|
bindingsIdMap.remove(binding.getID());
|
||||||
assert !bindingsNameMap.containsKey(binding.getUniqueName());
|
assert !bindingsNameMap.containsKey(String.valueOf(binding.getUniqueName()));
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Removing binding {} from {} bindingTable: {}", binding, this, debugBindings());
|
logger.trace("Removing binding {} from {} bindingTable: {}", binding, this, debugBindings());
|
||||||
|
@ -191,7 +191,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
|
public void forEach(BiConsumer<String, Binding> bindingConsumer) {
|
||||||
bindingsNameMap.forEach(bindingConsumer);
|
bindingsNameMap.forEach(bindingConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,7 +313,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
|
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
|
||||||
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
|
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
|
||||||
context.clear().setReusable(false);
|
context.clear().setReusable(false);
|
||||||
final Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
|
final Binding theBinding = bindingsNameMap.get(String.valueOf(CompositeAddress.extractQueueName(message.getAddressSimpleString())));
|
||||||
if (theBinding != null) {
|
if (theBinding != null) {
|
||||||
theBinding.route(message, context);
|
theBinding.route(message, context);
|
||||||
}
|
}
|
||||||
|
@ -601,6 +601,7 @@ public final class BindingsImpl implements Bindings {
|
||||||
final byte[] ids) throws Exception {
|
final byte[] ids) throws Exception {
|
||||||
if (!context.isMirrorDisabled()) {
|
if (!context.isMirrorDisabled()) {
|
||||||
context.setMirrorOption(MirrorOption.individualRoute);
|
context.setMirrorOption(MirrorOption.individualRoute);
|
||||||
|
context.setReusable(false);
|
||||||
}
|
}
|
||||||
byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);
|
byte[] idsToAck = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_ACK_IDS);
|
||||||
|
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.artemis.core.server;
|
|
||||||
|
|
||||||
/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
|
|
||||||
* */
|
|
||||||
public enum MirrorOption {
|
|
||||||
enabled,
|
|
||||||
disabled,
|
|
||||||
individualRoute
|
|
||||||
}
|
|
|
@ -39,8 +39,6 @@ public interface RoutingContext {
|
||||||
*/
|
*/
|
||||||
boolean isReusable();
|
boolean isReusable();
|
||||||
|
|
||||||
/** If the routing is from MirrorController, we don't redo mirrorController
|
|
||||||
* to avoid*/
|
|
||||||
MirrorOption getMirrorOption();
|
MirrorOption getMirrorOption();
|
||||||
|
|
||||||
void forEachDurable(Consumer<Queue> consumer);
|
void forEachDurable(Consumer<Queue> consumer);
|
||||||
|
@ -114,5 +112,11 @@ public interface RoutingContext {
|
||||||
|
|
||||||
ServerSession getServerSession();
|
ServerSession getServerSession();
|
||||||
|
|
||||||
|
enum MirrorOption {
|
||||||
|
enabled,
|
||||||
|
disabled,
|
||||||
|
individualRoute
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,10 +82,10 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
import org.apache.activemq.artemis.core.server.HandleStatus;
|
import org.apache.activemq.artemis.core.server.HandleStatus;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.MirrorOption;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
|
||||||
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
|
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||||
|
@ -708,9 +708,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
||||||
|
|
||||||
this.server = server;
|
this.server = server;
|
||||||
|
|
||||||
if (queueConfiguration.isInternal()) {
|
|
||||||
this.internalQueue = queueConfiguration.isInternal();
|
this.internalQueue = queueConfiguration.isInternal();
|
||||||
}
|
|
||||||
|
|
||||||
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
|
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor, this);
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.function.Consumer;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.server.MirrorOption;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RouteContextList;
|
import org.apache.activemq.artemis.core.server.RouteContextList;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
@ -168,9 +167,7 @@ public class RoutingContextImpl implements RoutingContext {
|
||||||
|
|
||||||
this.internalOnly = null;
|
this.internalOnly = null;
|
||||||
|
|
||||||
if (mirrorOption == MirrorOption.individualRoute) {
|
|
||||||
mirrorOption = MirrorOption.enabled;
|
mirrorOption = MirrorOption.enabled;
|
||||||
}
|
|
||||||
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,11 +24,13 @@ import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
import javax.jms.Topic;
|
import javax.jms.Topic;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
|
import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.qpid.proton.amqp.Symbol;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||||
|
@ -42,9 +44,9 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.MirrorOption;
|
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||||
|
import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
|
||||||
|
@ -52,9 +54,14 @@ import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
|
||||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
|
||||||
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
|
import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
|
||||||
import org.apache.activemq.artemis.tests.util.CFUtil;
|
import org.apache.activemq.artemis.tests.util.CFUtil;
|
||||||
import org.apache.activemq.artemis.utils.Wait;
|
import org.apache.activemq.artemis.utils.Wait;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||||
|
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -105,18 +112,22 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
|
||||||
|
|
||||||
|
|
||||||
server.setIdentity(name);
|
server.setIdentity(name);
|
||||||
server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
|
server.getConfiguration().setName("node").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
|
||||||
|
|
||||||
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
|
ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
|
||||||
server.getConfiguration().addClusterConfiguration(clusterConfiguration);
|
server.getConfiguration().addClusterConfiguration(clusterConfiguration);
|
||||||
|
|
||||||
if (mirrorPort > 0) {
|
if (mirrorPort > 0) {
|
||||||
server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
|
server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString(mirrorName(mirrorPort)))));
|
||||||
}
|
}
|
||||||
|
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String mirrorName(int mirrorPort) {
|
||||||
|
return "$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueRedistributionAMQP() throws Exception {
|
public void testQueueRedistributionAMQP() throws Exception {
|
||||||
internalQueueRedistribution("AMQP");
|
internalQueueRedistribution("AMQP");
|
||||||
|
@ -158,7 +169,7 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
|
||||||
consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
|
consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
|
||||||
} else {
|
} else {
|
||||||
place = "A2";
|
place = "A2";
|
||||||
consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
|
consumer = sessionA2.createConsumer(sessionA2.createQueue(QUEUE_NAME));
|
||||||
}
|
}
|
||||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||||
Assert.assertNotNull(message);
|
Assert.assertNotNull(message);
|
||||||
|
@ -225,7 +236,7 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
|
||||||
|
|
||||||
{
|
{
|
||||||
HashSet<String> subscriptionSet = new HashSet<>();
|
HashSet<String> subscriptionSet = new HashSet<>();
|
||||||
// making sure the queues created on a1 are propaged into b1
|
// making sure the queues created on a1 are propagated into b1
|
||||||
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
|
a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
|
||||||
logger.debug("{} = {}", n, b);
|
logger.debug("{} = {}", n, b);
|
||||||
if (b instanceof LocalQueueBinding) {
|
if (b instanceof LocalQueueBinding) {
|
||||||
|
@ -238,7 +249,7 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
|
||||||
subscriptionQueueName = subscriptionSet.iterator().next();
|
subscriptionQueueName = subscriptionSet.iterator().next();
|
||||||
}
|
}
|
||||||
|
|
||||||
// making sure the queues created on a2 are propaged into b2
|
// making sure the queues created on a2 are propagated into b2
|
||||||
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
|
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
|
||||||
logger.debug("{} = {}", n, b);
|
logger.debug("{} = {}", n, b);
|
||||||
if (b instanceof LocalQueueBinding) {
|
if (b instanceof LocalQueueBinding) {
|
||||||
|
@ -346,7 +357,7 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
|
||||||
Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
|
Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
|
||||||
|
|
||||||
List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
|
List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
|
||||||
// making sure the queues created on a2 are propaged into b2
|
// making sure the queues created on a2 are propagated into b2
|
||||||
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
|
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
|
||||||
if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
|
if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
|
||||||
logger.debug("{} = {}", a, b);
|
logger.debug("{} = {}", a, b);
|
||||||
|
@ -361,23 +372,138 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
|
||||||
Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
|
Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
|
||||||
directMessage.setAddress(TOPIC_NAME);
|
directMessage.setAddress(TOPIC_NAME);
|
||||||
directMessage.putStringProperty("Test", "t1");
|
directMessage.putStringProperty("Test", "t1");
|
||||||
|
|
||||||
|
// we will route a single message to subscription-0. a previous search found the RemoteBinding into remoteQueueBindins_a2;
|
||||||
remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
|
remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
|
||||||
a2.getPostOffice().processRoute(directMessage, routingContext, false);
|
a2.getPostOffice().processRoute(directMessage, routingContext, false);
|
||||||
routingContext.getTransaction().commit();
|
routingContext.getTransaction().commit();
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
String name = "my-topic-shared-subscription_" + i + ":global";
|
String name = "my-topic-shared-subscription_" + i + ":global";
|
||||||
Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
|
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
|
logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
|
||||||
logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
|
logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
|
||||||
logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
|
logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
|
||||||
logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
|
logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we routed to subscription-0 only, the outcome mirroring should only receive the output on subscription-0 on b1;
|
||||||
|
// When the routing happens after a clustered operation, mirror should be done individually to each routed queue.
|
||||||
|
// this test is validating that only subscription-0 got the message on both a1 and b1;
|
||||||
|
// notice that the initial route happened on a2, which then transfered the message towards a1.
|
||||||
|
// a1 made the copy to b1 through mirroring, and only subscription-0 should receive a message.
|
||||||
|
// which is exactly what should happen through message-redistribution in clustering
|
||||||
|
|
||||||
|
Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
|
||||||
Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
|
Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
|
||||||
Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
|
Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
|
||||||
Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
|
Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// This test is faking a MirrorSend.
|
||||||
|
// First it will send with an empty collection, then to a single queue
|
||||||
|
@Test
|
||||||
|
public void testFakeMirrorSend() throws Exception {
|
||||||
|
final String protocol = "AMQP";
|
||||||
|
String subscriptionName = "my-topic-shared-subscription";
|
||||||
|
|
||||||
|
ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
|
||||||
|
ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
|
||||||
|
|
||||||
|
Topic topic;
|
||||||
|
|
||||||
|
try (Connection conn = cfA1.createConnection()) {
|
||||||
|
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
topic = session.createTopic(TOPIC_NAME);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Connection conn = cfA2.createConnection()) {
|
||||||
|
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
topic = session.createTopic(TOPIC_NAME);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
|
||||||
|
Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
|
||||||
|
Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
|
||||||
|
Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
|
||||||
|
|
||||||
|
List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
|
||||||
|
// making sure the queues created on a2 are propagated into b2
|
||||||
|
a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
|
||||||
|
if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
|
||||||
|
logger.debug("{} = {}", a, b);
|
||||||
|
remoteQueueBindings_a2.add((RemoteQueueBinding) b);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.assertEquals(1, remoteQueueBindings_a2.size());
|
||||||
|
AmqpConnection connection = createAmqpConnection(new URI("tcp://localhost:" + A_1_PORT));
|
||||||
|
runAfter(connection::close);
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setAddress(TOPIC_NAME);
|
||||||
|
// this is sending an empty ArrayList for the TARGET_QUEUES.
|
||||||
|
// no queues should be altered when there's an empty TARGET_QUEUES
|
||||||
|
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), new ArrayList<>());
|
||||||
|
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), a1.getStorageManager().generateID());
|
||||||
|
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(b1.getNodeID()));
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(mirrorName(A_1_PORT), new Symbol[]{Symbol.getSymbol("amq.mirror")});
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
String name = "my-topic-shared-subscription_" + i + ":global";
|
||||||
|
|
||||||
|
// all queues should be empty
|
||||||
|
// because the send to the mirror had an empty TARGET_QUEUES
|
||||||
|
Wait.assertEquals(0, a1.locateQueue(name)::getMessageCount);
|
||||||
|
Wait.assertEquals(0, b1.locateQueue(name)::getMessageCount);
|
||||||
|
Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
|
||||||
|
Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
message = new AmqpMessage();
|
||||||
|
message.setAddress(TOPIC_NAME);
|
||||||
|
ArrayList<String> singleQueue = new ArrayList<>();
|
||||||
|
singleQueue.add("my-topic-shared-subscription_3:global");
|
||||||
|
singleQueue.add("IDONTEXIST");
|
||||||
|
message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), singleQueue);
|
||||||
|
message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), a1.getStorageManager().generateID());
|
||||||
|
message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(b1.getNodeID())); // simulating a node from b1, so it is not sent back to b1
|
||||||
|
|
||||||
|
sender.send(message);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
String name = "my-topic-shared-subscription_" + i + ":global";
|
||||||
|
|
||||||
|
if (i == 3) {
|
||||||
|
// only this queue, on this server should have received a message
|
||||||
|
// it shouldn't also be mirrored to its replica
|
||||||
|
Wait.assertEquals(1, a1.locateQueue(name)::getMessageCount);
|
||||||
|
} else {
|
||||||
|
Wait.assertEquals(0, a1.locateQueue(name)::getMessageCount);
|
||||||
|
}
|
||||||
|
Wait.assertEquals(0, b1.locateQueue(name)::getMessageCount);
|
||||||
|
Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
|
||||||
|
Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// This test has distinct subscriptions on each node and it is making sure the Mirror Routing is working accurately
|
// This test has distinct subscriptions on each node and it is making sure the Mirror Routing is working accurately
|
||||||
@Test
|
@Test
|
||||||
public void testMultiNodeSubscription() throws Exception {
|
public void testMultiNodeSubscription() throws Exception {
|
||||||
|
|
|
@ -434,7 +434,7 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
||||||
static class BindingsFake implements Bindings {
|
static class BindingsFake implements Bindings {
|
||||||
|
|
||||||
SimpleString name;
|
SimpleString name;
|
||||||
ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<>();
|
ConcurrentHashMap<String, Binding> bindings = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
BindingsFake(SimpleString address) {
|
BindingsFake(SimpleString address) {
|
||||||
this.name = address;
|
this.name = address;
|
||||||
|
@ -447,12 +447,12 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addBinding(Binding binding) {
|
public void addBinding(Binding binding) {
|
||||||
bindings.put(binding.getUniqueName(), binding);
|
bindings.put(String.valueOf(binding.getUniqueName()), binding);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Binding removeBindingByUniqueName(SimpleString uniqueName) {
|
public Binding removeBindingByUniqueName(SimpleString uniqueName) {
|
||||||
return bindings.remove(uniqueName);
|
return bindings.remove(String.valueOf(uniqueName));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -467,11 +467,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Binding getBinding(String name) {
|
public Binding getBinding(String name) {
|
||||||
return null;
|
return bindings.get(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void forEach(BiConsumer<SimpleString, Binding> bindingConsumer) {
|
public void forEach(BiConsumer<String, Binding> bindingConsumer) {
|
||||||
bindings.forEach(bindingConsumer);
|
bindings.forEach(bindingConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue