This closes #2841
This commit is contained in:
commit
320381a2c6
|
@ -430,9 +430,6 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
|
||||
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
|
||||
|
||||
// will the target node delete the store-and-forward queue for the scaled down node.
|
||||
private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false;
|
||||
|
||||
// How long to wait for a decision
|
||||
private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
|
||||
|
||||
|
@ -1534,8 +1531,4 @@ public final class ActiveMQDefaultConfiguration {
|
|||
public static long getDefaultRetryReplicationWait() {
|
||||
return DEFAULT_RETRY_REPLICATION_WAIT;
|
||||
}
|
||||
|
||||
public static boolean isDefaultCleanupSfQueue() {
|
||||
return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,7 +277,6 @@ public class PacketImpl implements Packet {
|
|||
|
||||
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
|
||||
|
||||
public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
|
|
|
@ -125,9 +125,9 @@ public final class ConfigurationUtils {
|
|||
public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) {
|
||||
if (scaleDownConfiguration != null) {
|
||||
if (scaleDownConfiguration.getDiscoveryGroup() != null) {
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
|
||||
} else {
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -34,8 +34,6 @@ public class ScaleDownConfiguration implements Serializable {
|
|||
|
||||
private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
|
||||
|
||||
private boolean cleanupSfQueue = ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue();
|
||||
|
||||
public List<String> getConnectors() {
|
||||
return connectors;
|
||||
}
|
||||
|
@ -85,13 +83,4 @@ public class ScaleDownConfiguration implements Serializable {
|
|||
this.enabled = enabled;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Boolean isCleanupSfQueue() {
|
||||
return this.cleanupSfQueue;
|
||||
}
|
||||
|
||||
public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) {
|
||||
this.cleanupSfQueue = cleanupSfQueue;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1579,8 +1579,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
Element scaleDownElement = (Element) scaleDownNode.item(0);
|
||||
|
||||
scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, "cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue()));
|
||||
|
||||
scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled()));
|
||||
|
||||
NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref");
|
||||
|
@ -1794,6 +1792,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
|
||||
|
||||
String scaleDownConnector = e.getAttribute("scale-down-connector");
|
||||
|
||||
String discoveryGroupName = null;
|
||||
|
||||
List<String> staticConnectorNames = new ArrayList<>();
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||
|
@ -77,7 +76,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
|
|||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
|
||||
|
@ -254,10 +252,6 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
packet = new ScaleDownAnnounceMessage();
|
||||
break;
|
||||
}
|
||||
case SCALEDOWN_ANNOUNCEMENT_V2: {
|
||||
packet = new ScaleDownAnnounceMessageV2();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
packet = super.decode(packetType, connection);
|
||||
}
|
||||
|
|
|
@ -22,17 +22,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|||
|
||||
public class ScaleDownAnnounceMessage extends PacketImpl {
|
||||
|
||||
protected SimpleString targetNodeId;
|
||||
protected SimpleString scaledDownNodeId;
|
||||
private SimpleString targetNodeId;
|
||||
private SimpleString scaledDownNodeId;
|
||||
|
||||
public ScaleDownAnnounceMessage() {
|
||||
super(SCALEDOWN_ANNOUNCEMENT);
|
||||
}
|
||||
|
||||
public ScaleDownAnnounceMessage(byte type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
|
||||
super(SCALEDOWN_ANNOUNCEMENT);
|
||||
this.targetNodeId = targetNodeId;
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
||||
public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage {
|
||||
|
||||
public ScaleDownAnnounceMessageV2() {
|
||||
super(SCALEDOWN_ANNOUNCEMENT_V2);
|
||||
}
|
||||
|
||||
public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
|
||||
this();
|
||||
this.targetNodeId = targetNodeId;
|
||||
this.scaledDownNodeId = scaledDownNodeId;
|
||||
}
|
||||
}
|
|
@ -202,6 +202,9 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
|
||||
void deleteQueue(boolean removeConsumers) throws Exception;
|
||||
|
||||
/** This method will push a removeAddress call into server's remove address */
|
||||
void removeAddress() throws Exception;
|
||||
|
||||
void destroyPaging() throws Exception;
|
||||
|
||||
long getMessageCount();
|
||||
|
@ -454,5 +457,4 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
|
||||
}
|
||||
|
||||
boolean internalDelete();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
|||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
|
||||
|
||||
|
@ -97,11 +96,4 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
|
|||
* @return
|
||||
*/
|
||||
BridgeMetrics getBridgeMetrics(String nodeId);
|
||||
|
||||
/**
|
||||
* Remove the store-and-forward queue after scale down
|
||||
*/
|
||||
void removeSfQueue(SimpleString scaledDownNodeId);
|
||||
|
||||
void removeSfQueue(Queue queue);
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnoun
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
@ -196,10 +195,8 @@ public class ClusterControl implements AutoCloseable {
|
|||
return requestBackup(backupRequestMessage);
|
||||
}
|
||||
|
||||
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId, boolean isCleanupSfQueue) {
|
||||
|
||||
ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
|
||||
|
||||
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
|
||||
ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
|
||||
clusterChannel.send(announceMessage);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.cluster;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -401,19 +400,12 @@ public class ClusterController implements ActiveMQComponent {
|
|||
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
||||
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
|
||||
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT || packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
|
||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
||||
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
||||
//we don't really need to check as it should always be true
|
||||
if (server.getNodeID().equals(message.getTargetNodeId())) {
|
||||
server.addScaledDownNode(message.getScaledDownNodeId());
|
||||
}
|
||||
if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
|
||||
ClusterManager clusterManager = ClusterController.this.server.getClusterManager();
|
||||
Set<ClusterConnection> ccs = clusterManager.getClusterConnections();
|
||||
for (ClusterConnection cc : ccs) {
|
||||
cc.removeSfQueue(message.getScaledDownNodeId());
|
||||
}
|
||||
}
|
||||
} else if (channelHandler != null) {
|
||||
channelHandler.handlePacket(packet);
|
||||
}
|
||||
|
|
|
@ -41,25 +41,21 @@ public class ScaleDownPolicy {
|
|||
|
||||
private boolean enabled;
|
||||
|
||||
private boolean isCleanupSfQueue;
|
||||
|
||||
public ScaleDownPolicy() {
|
||||
}
|
||||
|
||||
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
|
||||
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) {
|
||||
this.connectors = connectors;
|
||||
this.groupName = groupName;
|
||||
this.clusterName = clusterName;
|
||||
this.enabled = enabled;
|
||||
this.isCleanupSfQueue = isCleanupSfQueue;
|
||||
}
|
||||
|
||||
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
|
||||
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) {
|
||||
this.discoveryGroup = discoveryGroup;
|
||||
this.groupName = groupName;
|
||||
this.clusterName = clusterName;
|
||||
this.enabled = enabled;
|
||||
this.isCleanupSfQueue = isCleanupSfQueue;
|
||||
}
|
||||
|
||||
public List<String> getConnectors() {
|
||||
|
@ -128,8 +124,4 @@ public class ScaleDownPolicy {
|
|||
ActiveMQServer activeMQServer) {
|
||||
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
|
||||
}
|
||||
|
||||
public boolean isCleanupSfQueue() {
|
||||
return this.isCleanupSfQueue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -723,30 +723,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
}
|
||||
|
||||
if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
|
||||
((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
|
||||
|
||||
// stop the bridge from trying to reconnect and clean up all the bindings
|
||||
fail(true);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
scaleDown(scaleDownTargetNodeID);
|
||||
} else if (scaleDownTargetNodeID != null) {
|
||||
// the disconnected node is scaling down to me, no need to reconnect to it
|
||||
logger.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect.");
|
||||
fail(true);
|
||||
fail(true, true);
|
||||
} else {
|
||||
logger.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID);
|
||||
|
||||
fail(me.getType() == ActiveMQExceptionType.DISCONNECTED);
|
||||
fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false);
|
||||
}
|
||||
|
||||
tryScheduleRetryReconnect(me.getType());
|
||||
}
|
||||
|
||||
protected void scaleDown(String scaleDownTargetNodeID) {
|
||||
synchronized (this) {
|
||||
try {
|
||||
logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID);
|
||||
((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID));
|
||||
|
||||
// stop the bridge from trying to reconnect and clean up all the bindings
|
||||
fail(true, true);
|
||||
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) {
|
||||
scheduleRetryConnect();
|
||||
}
|
||||
|
@ -865,7 +870,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
return transformer;
|
||||
}
|
||||
|
||||
protected void fail(final boolean permanently) {
|
||||
protected void fail(final boolean permanently, boolean scaleDown) {
|
||||
logger.debug(this + "\n\t::fail being called, permanently=" + permanently);
|
||||
//we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly
|
||||
if (targetNodeID != null) {
|
||||
|
@ -1050,7 +1055,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
fail(false);
|
||||
fail(false, false);
|
||||
scheduleRetryConnect();
|
||||
}
|
||||
}
|
||||
|
@ -1069,7 +1074,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
|
||||
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) {
|
||||
ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts);
|
||||
fail(true);
|
||||
fail(true, false);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1113,9 +1118,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
|
||||
}
|
||||
|
||||
protected void postStop() {
|
||||
}
|
||||
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
|
@ -1232,7 +1234,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
|
||||
postStop();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -382,11 +382,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
super.nodeUP(member, last);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postStop() {
|
||||
clusterConnection.removeSfQueue(queue);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void afterConnect() throws Exception {
|
||||
|
@ -402,13 +397,23 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void fail(final boolean permanently) {
|
||||
protected void fail(final boolean permanently, final boolean scaleDown) {
|
||||
logger.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently);
|
||||
super.fail(permanently);
|
||||
super.fail(permanently, scaleDown);
|
||||
|
||||
if (permanently) {
|
||||
logger.debug("cluster node for bridge " + this.getName() + " is permanently down");
|
||||
clusterConnection.removeRecord(targetNodeID);
|
||||
|
||||
if (scaleDown) {
|
||||
try {
|
||||
queue.deleteQueue(true);
|
||||
queue.removeAddress();
|
||||
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
clusterConnection.disconnectRecord(targetNodeID);
|
||||
}
|
||||
|
|
|
@ -774,27 +774,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSfQueue(SimpleString scaledDownNodeId) {
|
||||
SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString());
|
||||
Binding binding = server.getPostOffice().getBinding(sfQName);
|
||||
|
||||
if (binding != null) {
|
||||
removeSfQueue((Queue) binding.getBindable());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSfQueue(Queue queue) {
|
||||
if (queue.internalDelete()) {
|
||||
try {
|
||||
server.removeAddressInfo(queue.getAddress(), null);
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to remove sf address: " + queue.getAddress(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewRecord(final long eventUID,
|
||||
final String targetNodeID,
|
||||
final TransportConfiguration connector,
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
|
|||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
|
||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
|
@ -50,7 +49,6 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
|
|||
private ActiveMQServer parentServer;
|
||||
private ServerLocator locator;
|
||||
private final ClusterController clusterController;
|
||||
private ScaleDownPolicy scaleDownPolicy;
|
||||
|
||||
public BackupRecoveryJournalLoader(PostOffice postOffice,
|
||||
PagingManager pagingManager,
|
||||
|
@ -62,14 +60,12 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
|
|||
Configuration configuration,
|
||||
ActiveMQServer parentServer,
|
||||
ServerLocatorInternal locator,
|
||||
ClusterController clusterController,
|
||||
ScaleDownPolicy scaleDownPolicy) {
|
||||
ClusterController clusterController) {
|
||||
|
||||
super(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration);
|
||||
this.parentServer = parentServer;
|
||||
this.locator = locator;
|
||||
this.clusterController = clusterController;
|
||||
this.scaleDownPolicy = scaleDownPolicy;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,12 +87,11 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
|
|||
public void postLoad(Journal messageJournal,
|
||||
ResourceManager resourceManager,
|
||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
|
||||
|
||||
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
|
||||
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
|
||||
|
||||
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
|
||||
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID(), this.scaleDownPolicy);
|
||||
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -179,7 +179,6 @@ public class LiveOnlyActivation extends Activation {
|
|||
DuplicateIDCache duplicateIDCache = activeMQServer.getPostOffice().getDuplicateIDCache(address);
|
||||
duplicateIDMap.put(address, duplicateIDCache.getMap());
|
||||
}
|
||||
|
||||
return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null, this.liveOnlyPolicy.getScaleDownPolicy());
|
||||
return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,8 +315,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private volatile long ringSize;
|
||||
|
||||
private Boolean removeSf;
|
||||
|
||||
/**
|
||||
* This is to avoid multi-thread races on calculating direct delivery,
|
||||
* to guarantee ordering will be always be correct
|
||||
|
@ -2077,6 +2075,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
deleteQueue(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAddress() throws Exception {
|
||||
server.removeAddressInfo(getAddress(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteQueue(boolean removeConsumers) throws Exception {
|
||||
synchronized (this) {
|
||||
|
@ -2557,7 +2560,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
@Override
|
||||
public void setInternalQueue(boolean internalQueue) {
|
||||
this.internalQueue = internalQueue;
|
||||
this.removeSf = null;
|
||||
}
|
||||
|
||||
// Public
|
||||
|
@ -3487,29 +3489,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the store and forward queue
|
||||
* Only the second caller (if there is one) of this method does the actual deletion.
|
||||
* The logic makes sure the sf queue is deleted only after bridge is stopped.
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean internalDelete() {
|
||||
if (this.isInternalQueue()) {
|
||||
if (removeSf == null) {
|
||||
removeSf = false;
|
||||
} else if (removeSf == false) {
|
||||
try {
|
||||
deleteQueue();
|
||||
removeSf = true;
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
logger.debug("Error removing sf queue " + getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean checkExpired(final MessageReference reference) {
|
||||
try {
|
||||
if (reference.getMessage().isExpired()) {
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
|
||||
|
@ -92,15 +91,14 @@ public class ScaleDownHandler {
|
|||
ResourceManager resourceManager,
|
||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
|
||||
SimpleString managementAddress,
|
||||
SimpleString targetNodeId,
|
||||
ScaleDownPolicy scaleDownPolicy) throws Exception {
|
||||
SimpleString targetNodeId) throws Exception {
|
||||
ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory);
|
||||
clusterControl.authorize();
|
||||
long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
|
||||
ActiveMQServerLogger.LOGGER.infoScaledDownMessages(num);
|
||||
scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
|
||||
scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
|
||||
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId(), scaleDownPolicy.isCleanupSfQueue());
|
||||
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
|
||||
return num;
|
||||
}
|
||||
|
||||
|
|
|
@ -411,7 +411,7 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
Configuration configuration,
|
||||
ActiveMQServer parentServer) throws ActiveMQException {
|
||||
if (replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled()) {
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), replicaPolicy.getScaleDownPolicy());
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
|
||||
} else {
|
||||
return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
|
||||
}
|
||||
|
|
|
@ -172,7 +172,7 @@ public final class SharedStoreBackupActivation extends Activation {
|
|||
Configuration configuration,
|
||||
ActiveMQServer parentServer) throws ActiveMQException {
|
||||
if (sharedStoreSlavePolicy.getScaleDownPolicy() != null && sharedStoreSlavePolicy.getScaleDownPolicy().isEnabled()) {
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), sharedStoreSlavePolicy.getScaleDownPolicy());
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
|
||||
} else {
|
||||
return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
|
||||
}
|
||||
|
|
|
@ -2881,13 +2881,6 @@
|
|||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
</xsd:choice>
|
||||
<xsd:element name="cleanup-sf-queue" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Tells the target node whether delete the store and forward queue after scale down.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:sequence>
|
||||
<xsd:attributeGroup ref="xml:specialAttrs"/>
|
||||
</xsd:complexType>
|
||||
|
|
|
@ -284,7 +284,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
" <connectors>\n" +
|
||||
" <connector-ref>server0-connector</connector-ref>\n" +
|
||||
" </connectors>\n" +
|
||||
" <cleanup-sf-queue>true</cleanup-sf-queue>\n" +
|
||||
" </scale-down>\n" +
|
||||
" </live-only>\n" +
|
||||
"</ha-policy>\n" + lastPart;
|
||||
|
@ -297,7 +296,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
|
||||
LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
|
||||
ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
|
||||
assertTrue(scaledownCfg.isCleanupSfQueue());
|
||||
List<String> connectors = scaledownCfg.getConnectors();
|
||||
assertEquals(1, connectors.size());
|
||||
String connector = connectors.get(0);
|
||||
|
|
|
@ -299,7 +299,6 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertNotNull(lopc.getScaleDownConfiguration());
|
||||
assertEquals(lopc.getScaleDownConfiguration().getGroupName(), "boo!");
|
||||
assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), "dg1");
|
||||
assertFalse(lopc.getScaleDownConfiguration().isCleanupSfQueue());
|
||||
|
||||
for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) {
|
||||
if (ccc.getName().equals("cluster-connection1")) {
|
||||
|
|
|
@ -794,6 +794,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAddress() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAcknowledgeAttempts() {
|
||||
return 0;
|
||||
|
@ -888,11 +892,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
public void recheckRefCount(OperationContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean internalDelete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unproposed(SimpleString groupID) {
|
||||
|
||||
|
|
|
@ -695,30 +695,6 @@ transactions are there for the client when it reconnects. The normal
|
|||
reconnect settings apply when the client is reconnecting so these should
|
||||
be high enough to deal with the time needed to scale down.
|
||||
|
||||
#### Automatic Deleting Store-and-Forward Queue after Scale Down
|
||||
|
||||
By default after the node is scaled down to a target node the internal
|
||||
SF queue is not deleted. There is a boolean configuration parameter called
|
||||
"cleanup-sf-queue" that can be used in case you want to delete it.
|
||||
|
||||
To do so you need to add this parameter to the scale-down policy and
|
||||
set it to "true". For example:
|
||||
|
||||
```xml
|
||||
<ha-policy>
|
||||
<live-only>
|
||||
<scale-down>
|
||||
...
|
||||
<cleanup-sf-queue>true</cleanup-sf-queue>
|
||||
</scale-down>
|
||||
</live-only>
|
||||
</ha-policy>
|
||||
```
|
||||
|
||||
With the above config in place when the scale down node is
|
||||
stopped, it will send a message to the target node once the scale down
|
||||
is complete. The target node will then properly delete the SF queue and its address.
|
||||
|
||||
## Failover Modes
|
||||
|
||||
Apache ActiveMQ Artemis defines two types of client failover:
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
|||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -76,6 +77,10 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId());
|
||||
IntegrationTestLogger.LOGGER.info("Node 2: " + servers[2].getClusterManager().getNodeId());
|
||||
IntegrationTestLogger.LOGGER.info("===============================");
|
||||
|
||||
servers[0].setIdentity("Node0");
|
||||
servers[1].setIdentity("Node1");
|
||||
servers[2].setIdentity("Node2");
|
||||
}
|
||||
|
||||
protected boolean isNetty() {
|
||||
|
@ -117,7 +122,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
|
||||
|
||||
// pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge
|
||||
String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
|
||||
final String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString();
|
||||
Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue();
|
||||
snfQueue.pause();
|
||||
|
||||
|
@ -156,20 +161,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
// add a consumer to node 0 to trigger redistribution here
|
||||
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
|
||||
|
||||
// allow some time for redistribution to move the message to the SnF queue
|
||||
long timeout = 10000;
|
||||
long start = System.currentTimeMillis();
|
||||
long messageCount = 0;
|
||||
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
// ensure the message is not in the queue on node 2
|
||||
messageCount = getMessageCount(snfQueue);
|
||||
if (messageCount < TEST_SIZE) {
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Wait.assertEquals(TEST_SIZE, snfQueue::getMessageCount);
|
||||
|
||||
// ensure the message is in the SnF queue
|
||||
Assert.assertEquals(TEST_SIZE, getMessageCount(snfQueue));
|
||||
|
@ -179,37 +171,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
removeConsumer(0);
|
||||
servers[0].stop();
|
||||
|
||||
start = System.currentTimeMillis();
|
||||
Queue queueServer2 = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
|
||||
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
// ensure the message is not in the queue on node 2
|
||||
messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
|
||||
if (messageCount > 0) {
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, messageCount);
|
||||
Wait.assertEquals(0, queueServer2::getMessageCount);
|
||||
|
||||
// get the messages from queue 1 on node 1
|
||||
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
|
||||
|
||||
// allow some time for redistribution to move the message to node 1
|
||||
start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
// ensure the message is not in the queue on node 2
|
||||
messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
|
||||
if (messageCount < TEST_SIZE) {
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// ensure the message is in queue 1 on node 1 as expected
|
||||
Assert.assertEquals(TEST_SIZE, messageCount);
|
||||
Queue queueServer1 = ((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue();
|
||||
Wait.assertEquals(TEST_SIZE, queueServer1::getMessageCount);
|
||||
|
||||
for (int i = 0; i < TEST_SIZE; i++) {
|
||||
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
|
||||
|
@ -229,6 +200,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
ClientMessage clientMessage = consumers[0].getConsumer().receive(250);
|
||||
Assert.assertNull(clientMessage);
|
||||
removeConsumer(0);
|
||||
|
||||
Wait.assertTrue(() -> (servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
|
||||
Wait.assertTrue(() -> (servers[1].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null);
|
||||
|
||||
Assert.assertFalse(servers[1].queueQuery(SimpleString.toSimpleString(snfAddress)).isExists());
|
||||
Assert.assertFalse(servers[1].addressQuery(SimpleString.toSimpleString(snfAddress)).isExists());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -278,23 +255,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
|
||||
addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
|
||||
|
||||
// allow some time for redistribution to move the message to the SnF queue
|
||||
long timeout = 10000;
|
||||
long start = System.currentTimeMillis();
|
||||
long messageCount = 0;
|
||||
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
// ensure the message is not in the queue on node 2
|
||||
messageCount = getMessageCount(snfQueue);
|
||||
if (messageCount < TEST_SIZE * 2) {
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// ensure the message is in the SnF queue
|
||||
Assert.assertEquals(TEST_SIZE * 2, getMessageCount(snfQueue));
|
||||
Wait.assertEquals(TEST_SIZE * 2, snfQueue::getMessageCount);
|
||||
|
||||
// trigger scaleDown from node 0 to node 1
|
||||
IntegrationTestLogger.LOGGER.info("============ Stopping " + servers[0].getNodeID());
|
||||
|
@ -302,20 +264,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
removeConsumer(1);
|
||||
servers[0].stop();
|
||||
|
||||
start = System.currentTimeMillis();
|
||||
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
// ensure the messages are not in the queues on node 2
|
||||
messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
|
||||
messageCount += getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
|
||||
if (messageCount > 0) {
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, messageCount);
|
||||
Wait.assertEquals(0, () -> getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
|
||||
getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
|
||||
|
||||
Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue()));
|
||||
|
||||
|
@ -323,21 +273,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase {
|
|||
addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
|
||||
addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
|
||||
|
||||
// allow some time for redistribution to move the message to node 1
|
||||
start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < timeout) {
|
||||
// ensure the message is not in the queue on node 2
|
||||
messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue());
|
||||
messageCount += getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue());
|
||||
if (messageCount < TEST_SIZE * 2) {
|
||||
Thread.sleep(200);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Wait.assertEquals(TEST_SIZE * 2, () -> getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) +
|
||||
getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()));
|
||||
// ensure the message is in queue 1 on node 1 as expected
|
||||
Assert.assertEquals(TEST_SIZE * 2, messageCount);
|
||||
|
||||
for (int i = 0; i < TEST_SIZE; i++) {
|
||||
ClientMessage clientMessage = consumers[0].getConsumer().receive(1000);
|
||||
|
|
|
@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
|||
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -31,35 +29,19 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
||||
|
||||
@Parameterized.Parameters(name = "RemoveOption={0}")
|
||||
public static Collection getParameters() {
|
||||
return Arrays.asList(new Object[][]{{"default"}, {"true"}, {"false"}});
|
||||
public ScaleDownRemoveSFTest() {
|
||||
}
|
||||
|
||||
public ScaleDownRemoveSFTest(String option) {
|
||||
this.option = option;
|
||||
}
|
||||
|
||||
private String option;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
|
||||
if (!"default".equals(option)) {
|
||||
scaleDownConfiguration.setCleanupSfQueue("true".equals(this.option));
|
||||
}
|
||||
setupLiveServer(0, isFileStorage(), isNetty(), true);
|
||||
setupLiveServer(1, isFileStorage(), isNetty(), true);
|
||||
LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
|
||||
|
@ -116,8 +98,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
|||
SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
|
||||
|
||||
System.out.println("[sf queue on server 1]: " + sfQueueName);
|
||||
QueueQueryResult result = servers[1].queueQuery(sfQueueName);
|
||||
assertTrue(result.isExists());
|
||||
|
||||
Assert.assertTrue(servers[1].queueQuery(sfQueueName).isExists());
|
||||
|
||||
// trigger scaleDown from node 0 to node 1
|
||||
servers[0].stop();
|
||||
|
@ -133,15 +115,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
|||
removeConsumer(0);
|
||||
|
||||
//check
|
||||
result = servers[1].queueQuery(sfQueueName);
|
||||
AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
|
||||
if ("true".equals(option)) {
|
||||
assertFalse(result.isExists());
|
||||
assertFalse(result2.isExists());
|
||||
} else {
|
||||
assertTrue(result.isExists());
|
||||
assertTrue(result2.isExists());
|
||||
}
|
||||
Assert.assertFalse(servers[1].queueQuery(sfQueueName).isExists());
|
||||
Assert.assertFalse(servers[1].addressQuery(sfQueueName).isExists());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -59,6 +59,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAddress() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelayBeforeDispatch() {
|
||||
return 0;
|
||||
|
@ -196,11 +201,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean internalDelete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPersistedPause() {
|
||||
return false;
|
||||
|
|
Loading…
Reference in New Issue