diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index a3270840a3..d41e8d3a6a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -430,9 +430,6 @@ public final class ActiveMQDefaultConfiguration { // its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false private static boolean DEFAULT_SCALE_DOWN_ENABLED = true; - // will the target node delete the store-and-forward queue for the scaled down node. - private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false; - // How long to wait for a decision private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000; @@ -1534,8 +1531,4 @@ public final class ActiveMQDefaultConfiguration { public static long getDefaultRetryReplicationWait() { return DEFAULT_RETRY_REPLICATION_WAIT; } - - public static boolean isDefaultCleanupSfQueue() { - return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE; - } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index ef2fae869e..a7a32539ff 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -277,7 +277,6 @@ public class PacketImpl implements Packet { public static final byte SESS_BINDINGQUERY_RESP_V4 = -15; - public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16; // Static -------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java index 56974603a1..a3149473a6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ConfigurationUtils.java @@ -125,9 +125,9 @@ public final class ConfigurationUtils { public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) { if (scaleDownConfiguration != null) { if (scaleDownConfiguration.getDiscoveryGroup() != null) { - return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue()); + return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled()); } else { - return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue()); + return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled()); } } return null; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java index d0ea7d6bd8..5f58e36bdc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ScaleDownConfiguration.java @@ -34,8 +34,6 @@ public class ScaleDownConfiguration implements Serializable { private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled(); - private boolean cleanupSfQueue = ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue(); - public List getConnectors() { return connectors; } @@ -85,13 +83,4 @@ public class ScaleDownConfiguration implements Serializable { this.enabled = enabled; return this; } - - public Boolean isCleanupSfQueue() { - return this.cleanupSfQueue; - } - - public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) { - this.cleanupSfQueue = cleanupSfQueue; - return this; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index bd095e0c29..2b4b481b20 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1579,8 +1579,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Element scaleDownElement = (Element) scaleDownNode.item(0); - scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, "cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue())); - scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled())); NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref"); @@ -1794,6 +1792,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO); + String scaleDownConnector = e.getAttribute("scale-down-connector"); + String discoveryGroupName = null; List staticConnectorNames = new ArrayList<>(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 853b3ae5a2..0428abebd0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage; @@ -77,7 +76,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT; -import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS; @@ -254,10 +252,6 @@ public class ServerPacketDecoder extends ClientPacketDecoder { packet = new ScaleDownAnnounceMessage(); break; } - case SCALEDOWN_ANNOUNCEMENT_V2: { - packet = new ScaleDownAnnounceMessageV2(); - break; - } default: { packet = super.decode(packetType, connection); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java index 3c18adbca1..7a6f147e3e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessage.java @@ -22,17 +22,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; public class ScaleDownAnnounceMessage extends PacketImpl { - protected SimpleString targetNodeId; - protected SimpleString scaledDownNodeId; + private SimpleString targetNodeId; + private SimpleString scaledDownNodeId; public ScaleDownAnnounceMessage() { super(SCALEDOWN_ANNOUNCEMENT); } - public ScaleDownAnnounceMessage(byte type) { - super(type); - } - public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString scaledDownNodeId) { super(SCALEDOWN_ANNOUNCEMENT); this.targetNodeId = targetNodeId; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java deleted file mode 100644 index a5c09cd645..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ScaleDownAnnounceMessageV2.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - -import org.apache.activemq.artemis.api.core.SimpleString; - -public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage { - - public ScaleDownAnnounceMessageV2() { - super(SCALEDOWN_ANNOUNCEMENT_V2); - } - - public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString scaledDownNodeId) { - this(); - this.targetNodeId = targetNodeId; - this.scaledDownNodeId = scaledDownNodeId; - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index adcb72e4e3..4d769f2751 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -202,6 +202,9 @@ public interface Queue extends Bindable,CriticalComponent { void deleteQueue(boolean removeConsumers) throws Exception; + /** This method will push a removeAddress call into server's remove address */ + void removeAddress() throws Exception; + void destroyPaging() throws Exception; long getMessageCount(); @@ -454,5 +457,4 @@ public interface Queue extends Bindable,CriticalComponent { } - boolean internalDelete(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java index 9c4ea18194..6171476188 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java @@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics; @@ -97,11 +96,4 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis * @return */ BridgeMetrics getBridgeMetrics(String nodeId); - - /** - * Remove the store-and-forward queue after scale down - */ - void removeSfQueue(SimpleString scaledDownNodeId); - - void removeSfQueue(Queue queue); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java index 991b8b3285..07f0fc283c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterControl.java @@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnoun import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; -import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -196,10 +195,8 @@ public class ClusterControl implements AutoCloseable { return requestBackup(backupRequestMessage); } - public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId, boolean isCleanupSfQueue) { - - ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId); - + public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId) { + ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId); clusterChannel.send(announceMessage); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index 572a919f64..86cd0df154 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.cluster; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -401,19 +400,12 @@ public class ClusterController implements ActiveMQComponent { Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote()); ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString()); clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote)); - } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT || packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) { + } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) { ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet; //we don't really need to check as it should always be true if (server.getNodeID().equals(message.getTargetNodeId())) { server.addScaledDownNode(message.getScaledDownNodeId()); } - if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) { - ClusterManager clusterManager = ClusterController.this.server.getClusterManager(); - Set ccs = clusterManager.getClusterConnections(); - for (ClusterConnection cc : ccs) { - cc.removeSfQueue(message.getScaledDownNodeId()); - } - } } else if (channelHandler != null) { channelHandler.handlePacket(packet); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java index a7db3e6b05..0ef96d5322 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java @@ -41,25 +41,21 @@ public class ScaleDownPolicy { private boolean enabled; - private boolean isCleanupSfQueue; - public ScaleDownPolicy() { } - public ScaleDownPolicy(List connectors, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) { + public ScaleDownPolicy(List connectors, String groupName, String clusterName, boolean enabled) { this.connectors = connectors; this.groupName = groupName; this.clusterName = clusterName; this.enabled = enabled; - this.isCleanupSfQueue = isCleanupSfQueue; } - public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) { + public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) { this.discoveryGroup = discoveryGroup; this.groupName = groupName; this.clusterName = clusterName; this.enabled = enabled; - this.isCleanupSfQueue = isCleanupSfQueue; } public List getConnectors() { @@ -128,8 +124,4 @@ public class ScaleDownPolicy { ActiveMQServer activeMQServer) { return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames); } - - public boolean isCleanupSfQueue() { - return this.isCleanupSfQueue; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 6c692e7601..c43610ed9b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -723,30 +723,35 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } if (scaleDownTargetNodeID != null && !scaleDownTargetNodeID.equals(nodeUUID.toString())) { - synchronized (this) { - try { - logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID); - ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID)); - - // stop the bridge from trying to reconnect and clean up all the bindings - fail(true); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - } - } + scaleDown(scaleDownTargetNodeID); } else if (scaleDownTargetNodeID != null) { // the disconnected node is scaling down to me, no need to reconnect to it logger.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect."); - fail(true); + fail(true, true); } else { logger.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID); - fail(me.getType() == ActiveMQExceptionType.DISCONNECTED); + fail(me.getType() == ActiveMQExceptionType.DISCONNECTED, false); } tryScheduleRetryReconnect(me.getType()); } + protected void scaleDown(String scaleDownTargetNodeID) { + synchronized (this) { + try { + logger.debug("Moving " + queue.getMessageCount() + " messages from " + queue.getName() + " to " + scaleDownTargetNodeID); + ((QueueImpl) queue).moveReferencesBetweenSnFQueues(SimpleString.toSimpleString(scaleDownTargetNodeID)); + + // stop the bridge from trying to reconnect and clean up all the bindings + fail(true, true); + + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + protected void tryScheduleRetryReconnect(final ActiveMQExceptionType type) { scheduleRetryConnect(); } @@ -865,7 +870,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return transformer; } - protected void fail(final boolean permanently) { + protected void fail(final boolean permanently, boolean scaleDown) { logger.debug(this + "\n\t::fail being called, permanently=" + permanently); //we need to make sure we remove the node from the topology so any incoming quorum requests are voted correctly if (targetNodeID != null) { @@ -1050,7 +1055,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } catch (Throwable ignored) { } } - fail(false); + fail(false, false); scheduleRetryConnect(); } } @@ -1069,7 +1074,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttemptsInUse) { ActiveMQServerLogger.LOGGER.bridgeAbortStart(name, retryCount, reconnectAttempts); - fail(true); + fail(true, false); return; } @@ -1113,9 +1118,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } - protected void postStop() { - } - // Inner classes ------------------------------------------------- @@ -1232,7 +1234,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue); } ActiveMQServerLogger.LOGGER.bridgeStopped(name); - postStop(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java index 5603dfdee2..ade5d0cb01 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java @@ -382,11 +382,6 @@ public class ClusterConnectionBridge extends BridgeImpl { super.nodeUP(member, last); } - @Override - protected void postStop() { - clusterConnection.removeSfQueue(queue); - } - @Override protected void afterConnect() throws Exception { @@ -402,13 +397,23 @@ public class ClusterConnectionBridge extends BridgeImpl { } @Override - protected void fail(final boolean permanently) { + protected void fail(final boolean permanently, final boolean scaleDown) { logger.debug("Cluster Bridge " + this.getName() + " failed, permanently=" + permanently); - super.fail(permanently); + super.fail(permanently, scaleDown); if (permanently) { logger.debug("cluster node for bridge " + this.getName() + " is permanently down"); clusterConnection.removeRecord(targetNodeID); + + if (scaleDown) { + try { + queue.deleteQueue(true); + queue.removeAddress(); + + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } } else { clusterConnection.disconnectRecord(targetNodeID); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index a8bf90ef3a..d6f34c91de 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -774,27 +774,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null; } - @Override - public void removeSfQueue(SimpleString scaledDownNodeId) { - SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString()); - Binding binding = server.getPostOffice().getBinding(sfQName); - - if (binding != null) { - removeSfQueue((Queue) binding.getBindable()); - } - } - - @Override - public void removeSfQueue(Queue queue) { - if (queue.internalDelete()) { - try { - server.removeAddressInfo(queue.getAddress(), null); - } catch (Exception e) { - logger.debug("Failed to remove sf address: " + queue.getAddress(), e); - } - } - } - private void createNewRecord(final long eventUID, final String targetNodeID, final TransportConfiguration connector, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java index 8d700ed6df..77b9f3f9c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BackupRecoveryJournalLoader.java @@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterController; -import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.transaction.ResourceManager; @@ -50,7 +49,6 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { private ActiveMQServer parentServer; private ServerLocator locator; private final ClusterController clusterController; - private ScaleDownPolicy scaleDownPolicy; public BackupRecoveryJournalLoader(PostOffice postOffice, PagingManager pagingManager, @@ -62,14 +60,12 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { Configuration configuration, ActiveMQServer parentServer, ServerLocatorInternal locator, - ClusterController clusterController, - ScaleDownPolicy scaleDownPolicy) { + ClusterController clusterController) { super(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration); this.parentServer = parentServer; this.locator = locator; this.clusterController = clusterController; - this.scaleDownPolicy = scaleDownPolicy; } @Override @@ -91,12 +87,11 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader { public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map>> duplicateIDMap) throws Exception { - ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager()); locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator)); try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) { - scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID(), this.scaleDownPolicy); + scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID()); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java index bea060cf46..8dd160dfb4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java @@ -179,7 +179,6 @@ public class LiveOnlyActivation extends Activation { DuplicateIDCache duplicateIDCache = activeMQServer.getPostOffice().getDuplicateIDCache(address); duplicateIDMap.put(address, duplicateIDCache.getMap()); } - - return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null, this.liveOnlyPolicy.getScaleDownPolicy()); + return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2f6e93b3d9..f48e430a5a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -315,8 +315,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile long ringSize; - private Boolean removeSf; - /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -2077,6 +2075,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { deleteQueue(false); } + @Override + public void removeAddress() throws Exception { + server.removeAddressInfo(getAddress(), null); + } + @Override public void deleteQueue(boolean removeConsumers) throws Exception { synchronized (this) { @@ -2557,7 +2560,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { @Override public void setInternalQueue(boolean internalQueue) { this.internalQueue = internalQueue; - this.removeSf = null; } // Public @@ -3487,29 +3489,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - /** - * Delete the store and forward queue - * Only the second caller (if there is one) of this method does the actual deletion. - * The logic makes sure the sf queue is deleted only after bridge is stopped. - */ - @Override - public synchronized boolean internalDelete() { - if (this.isInternalQueue()) { - if (removeSf == null) { - removeSf = false; - } else if (removeSf == false) { - try { - deleteQueue(); - removeSf = true; - return true; - } catch (Exception e) { - logger.debug("Error removing sf queue " + getName(), e); - } - } - } - return false; - } - private boolean checkExpired(final MessageReference reference) { try { if (reference.getMessage().isExpired()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index d84d1999ab..db51dcc42a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.ClusterControl; import org.apache.activemq.artemis.core.server.cluster.ClusterController; -import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy; import org.apache.activemq.artemis.core.transaction.ResourceManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionOperation; @@ -92,15 +91,14 @@ public class ScaleDownHandler { ResourceManager resourceManager, Map>> duplicateIDMap, SimpleString managementAddress, - SimpleString targetNodeId, - ScaleDownPolicy scaleDownPolicy) throws Exception { + SimpleString targetNodeId) throws Exception { ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory); clusterControl.authorize(); long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(), clusterControl.getClusterPassword()); ActiveMQServerLogger.LOGGER.infoScaledDownMessages(num); scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(), clusterControl.getClusterPassword()); scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(), clusterControl.getClusterPassword()); - clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId(), scaleDownPolicy.isCleanupSfQueue()); + clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId()); return num; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 7adc190f72..587b8f0181 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -411,7 +411,7 @@ public final class SharedNothingBackupActivation extends Activation { Configuration configuration, ActiveMQServer parentServer) throws ActiveMQException { if (replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled()) { - return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), replicaPolicy.getScaleDownPolicy()); + return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController()); } else { return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java index 20ad7b39ef..c978ff6e0f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java @@ -172,7 +172,7 @@ public final class SharedStoreBackupActivation extends Activation { Configuration configuration, ActiveMQServer parentServer) throws ActiveMQException { if (sharedStoreSlavePolicy.getScaleDownPolicy() != null && sharedStoreSlavePolicy.getScaleDownPolicy().isEnabled()) { - return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), sharedStoreSlavePolicy.getScaleDownPolicy()); + return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController()); } else { return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer); } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index fc76b6f36b..fea5cc6f65 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -2881,13 +2881,6 @@ - - - - Tells the target node whether delete the store and forward queue after scale down. - - - diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 4129ee641f..6e41d1c86a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -284,7 +284,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { " \n" + " server0-connector\n" + " \n" + - " true\n" + " \n" + " \n" + "\n" + lastPart; @@ -297,7 +296,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig; ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration(); - assertTrue(scaledownCfg.isCleanupSfQueue()); List connectors = scaledownCfg.getConnectors(); assertEquals(1, connectors.size()); String connector = connectors.get(0); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 352688d0aa..e0f637252e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -299,7 +299,6 @@ public class FileConfigurationTest extends ConfigurationImplTest { assertNotNull(lopc.getScaleDownConfiguration()); assertEquals(lopc.getScaleDownConfiguration().getGroupName(), "boo!"); assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), "dg1"); - assertFalse(lopc.getScaleDownConfiguration().isCleanupSfQueue()); for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) { if (ccc.getName().equals("cluster-connection1")) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index e266ef5bdd..b3ae24007a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -794,6 +794,10 @@ public class ScheduledDeliveryHandlerTest extends Assert { } + @Override + public void removeAddress() throws Exception { + } + @Override public long getAcknowledgeAttempts() { return 0; @@ -888,11 +892,6 @@ public class ScheduledDeliveryHandlerTest extends Assert { public void recheckRefCount(OperationContext context) { } - @Override - public boolean internalDelete() { - return false; - } - @Override public void unproposed(SimpleString groupID) { diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md index 6df4648db3..1b82426847 100644 --- a/docs/user-manual/en/ha.md +++ b/docs/user-manual/en/ha.md @@ -695,30 +695,6 @@ transactions are there for the client when it reconnects. The normal reconnect settings apply when the client is reconnecting so these should be high enough to deal with the time needed to scale down. -#### Automatic Deleting Store-and-Forward Queue after Scale Down - -By default after the node is scaled down to a target node the internal -SF queue is not deleted. There is a boolean configuration parameter called -"cleanup-sf-queue" that can be used in case you want to delete it. - -To do so you need to add this parameter to the scale-down policy and -set it to "true". For example: - -```xml - - - - ... - true - - - -``` - -With the above config in place when the scale down node is -stopped, it will send a message to the target node once the scale down -is complete. The target node will then properly delete the SF queue and its address. - ## Failover Modes Apache ActiveMQ Artemis defines two types of client failover: diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java index c695b83f10..ebbef7dc32 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDown3NodeTest.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -76,6 +77,10 @@ public class ScaleDown3NodeTest extends ClusterTestBase { IntegrationTestLogger.LOGGER.info("Node 1: " + servers[1].getClusterManager().getNodeId()); IntegrationTestLogger.LOGGER.info("Node 2: " + servers[2].getClusterManager().getNodeId()); IntegrationTestLogger.LOGGER.info("==============================="); + + servers[0].setIdentity("Node0"); + servers[1].setIdentity("Node1"); + servers[2].setIdentity("Node2"); } protected boolean isNetty() { @@ -117,7 +122,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase { createQueue(2, addressName, queueName1, null, false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword()); // pause the SnF queue so that when the server tries to redistribute a message it won't actually go across the cluster bridge - String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString(); + final String snfAddress = servers[0].getInternalNamingPrefix() + "sf.cluster0." + servers[0].getNodeID().toString(); Queue snfQueue = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))).getQueue(); snfQueue.pause(); @@ -156,20 +161,7 @@ public class ScaleDown3NodeTest extends ClusterTestBase { // add a consumer to node 0 to trigger redistribution here addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword()); - // allow some time for redistribution to move the message to the SnF queue - long timeout = 10000; - long start = System.currentTimeMillis(); - long messageCount = 0; - - while (System.currentTimeMillis() - start < timeout) { - // ensure the message is not in the queue on node 2 - messageCount = getMessageCount(snfQueue); - if (messageCount < TEST_SIZE) { - Thread.sleep(200); - } else { - break; - } - } + Wait.assertEquals(TEST_SIZE, snfQueue::getMessageCount); // ensure the message is in the SnF queue Assert.assertEquals(TEST_SIZE, getMessageCount(snfQueue)); @@ -179,37 +171,16 @@ public class ScaleDown3NodeTest extends ClusterTestBase { removeConsumer(0); servers[0].stop(); - start = System.currentTimeMillis(); + Queue queueServer2 = ((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue(); - while (System.currentTimeMillis() - start < timeout) { - // ensure the message is not in the queue on node 2 - messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()); - if (messageCount > 0) { - Thread.sleep(200); - } else { - break; - } - } - - Assert.assertEquals(0, messageCount); + Wait.assertEquals(0, queueServer2::getMessageCount); // get the messages from queue 1 on node 1 addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword()); - // allow some time for redistribution to move the message to node 1 - start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout) { - // ensure the message is not in the queue on node 2 - messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()); - if (messageCount < TEST_SIZE) { - Thread.sleep(200); - } else { - break; - } - } - // ensure the message is in queue 1 on node 1 as expected - Assert.assertEquals(TEST_SIZE, messageCount); + Queue queueServer1 = ((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue(); + Wait.assertEquals(TEST_SIZE, queueServer1::getMessageCount); for (int i = 0; i < TEST_SIZE; i++) { ClientMessage clientMessage = consumers[0].getConsumer().receive(250); @@ -229,6 +200,12 @@ public class ScaleDown3NodeTest extends ClusterTestBase { ClientMessage clientMessage = consumers[0].getConsumer().receive(250); Assert.assertNull(clientMessage); removeConsumer(0); + + Wait.assertTrue(() -> (servers[2].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null); + Wait.assertTrue(() -> (servers[1].getPostOffice().getBinding(SimpleString.toSimpleString(snfAddress))) == null); + + Assert.assertFalse(servers[1].queueQuery(SimpleString.toSimpleString(snfAddress)).isExists()); + Assert.assertFalse(servers[1].addressQuery(SimpleString.toSimpleString(snfAddress)).isExists()); } @Test @@ -278,23 +255,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase { addConsumer(0, 0, queueName1, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword()); addConsumer(1, 0, queueName3, null, true, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword()); - // allow some time for redistribution to move the message to the SnF queue - long timeout = 10000; - long start = System.currentTimeMillis(); - long messageCount = 0; - - while (System.currentTimeMillis() - start < timeout) { - // ensure the message is not in the queue on node 2 - messageCount = getMessageCount(snfQueue); - if (messageCount < TEST_SIZE * 2) { - Thread.sleep(200); - } else { - break; - } - } - // ensure the message is in the SnF queue - Assert.assertEquals(TEST_SIZE * 2, getMessageCount(snfQueue)); + Wait.assertEquals(TEST_SIZE * 2, snfQueue::getMessageCount); // trigger scaleDown from node 0 to node 1 IntegrationTestLogger.LOGGER.info("============ Stopping " + servers[0].getNodeID()); @@ -302,20 +264,8 @@ public class ScaleDown3NodeTest extends ClusterTestBase { removeConsumer(1); servers[0].stop(); - start = System.currentTimeMillis(); - - while (System.currentTimeMillis() - start < timeout) { - // ensure the messages are not in the queues on node 2 - messageCount = getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()); - messageCount += getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()); - if (messageCount > 0) { - Thread.sleep(200); - } else { - break; - } - } - - Assert.assertEquals(0, messageCount); + Wait.assertEquals(0, () -> getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) + + getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue())); Assert.assertEquals(TEST_SIZE, getMessageCount(((LocalQueueBinding) servers[2].getPostOffice().getBinding(new SimpleString(queueName2))).getQueue())); @@ -323,21 +273,9 @@ public class ScaleDown3NodeTest extends ClusterTestBase { addConsumer(0, 1, queueName1, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword()); addConsumer(1, 1, queueName3, null, true, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword()); - // allow some time for redistribution to move the message to node 1 - start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout) { - // ensure the message is not in the queue on node 2 - messageCount = getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()); - messageCount += getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue()); - if (messageCount < TEST_SIZE * 2) { - Thread.sleep(200); - } else { - break; - } - } - + Wait.assertEquals(TEST_SIZE * 2, () -> getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()) + + getMessageCount(((LocalQueueBinding) servers[1].getPostOffice().getBinding(new SimpleString(queueName3))).getQueue())); // ensure the message is in queue 1 on node 1 as expected - Assert.assertEquals(TEST_SIZE * 2, messageCount); for (int i = 0; i < TEST_SIZE; i++) { ClientMessage clientMessage = consumers[0].getConsumer().receive(1000); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java index 92d26357d4..145f31d45e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownRemoveSFTest.java @@ -21,8 +21,6 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; -import org.apache.activemq.artemis.core.server.AddressQueryResult; -import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -31,35 +29,19 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import java.util.Arrays; -import java.util.Collection; -@RunWith(value = Parameterized.class) public class ScaleDownRemoveSFTest extends ClusterTestBase { - @Parameterized.Parameters(name = "RemoveOption={0}") - public static Collection getParameters() { - return Arrays.asList(new Object[][]{{"default"}, {"true"}, {"false"}}); + public ScaleDownRemoveSFTest() { } - public ScaleDownRemoveSFTest(String option) { - this.option = option; - } - - private String option; - @Override @Before public void setUp() throws Exception { super.setUp(); ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration(); - if (!"default".equals(option)) { - scaleDownConfiguration.setCleanupSfQueue("true".equals(this.option)); - } setupLiveServer(0, isFileStorage(), isNetty(), true); setupLiveServer(1, isFileStorage(), isNetty(), true); LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration(); @@ -116,8 +98,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase { SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString()); System.out.println("[sf queue on server 1]: " + sfQueueName); - QueueQueryResult result = servers[1].queueQuery(sfQueueName); - assertTrue(result.isExists()); + + Assert.assertTrue(servers[1].queueQuery(sfQueueName).isExists()); // trigger scaleDown from node 0 to node 1 servers[0].stop(); @@ -133,15 +115,8 @@ public class ScaleDownRemoveSFTest extends ClusterTestBase { removeConsumer(0); //check - result = servers[1].queueQuery(sfQueueName); - AddressQueryResult result2 = servers[1].addressQuery(sfQueueName); - if ("true".equals(option)) { - assertFalse(result.isExists()); - assertFalse(result2.isExists()); - } else { - assertTrue(result.isExists()); - assertTrue(result2.isExists()); - } + Assert.assertFalse(servers[1].queueQuery(sfQueueName).isExists()); + Assert.assertFalse(servers[1].addressQuery(sfQueueName).isExists()); } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 7b7890fc39..57953432f9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -59,6 +59,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } + @Override + public void removeAddress() throws Exception { + + } + @Override public long getDelayBeforeDispatch() { return 0; @@ -196,11 +201,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { } - @Override - public boolean internalDelete() { - return false; - } - @Override public boolean isPersistedPause() { return false;