ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown

After a node is scaled down to a target node, the sf queue in the
target node is not deleted.

Normally this is fine because may be reused when the scaled down
node is back up.

However in cloud environment many drainer pods can be created and
then shutdown in order to drain the messages to a live node (pod).
Each drainer pod will have a different node-id. Over time the sf
queues in the target broker node grows and those sf queues are
no longer reused.

Although use can use management API/console to manually delete
them, it would be nice to have an option to automatically delete
those sf queue/address resources after scale down.

In this PR it added a boolean configuration parameter called
cleanup-sf-queue to scale down policy so that if the parameter
is "true" the broker will send a message to the
target broker signalling that the SF queue is no longer
needed and should be deleted.

If the parameter is not defined (default) or is "false"
the scale down won't remove the sf queue.
This commit is contained in:
Howard Gao 2019-08-28 21:16:02 +08:00
parent 3a58387bd3
commit 397cef699a
29 changed files with 399 additions and 19 deletions

View File

@ -430,6 +430,9 @@ public final class ActiveMQDefaultConfiguration {
// its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
// will the target node delete the store-and-forward queue for the scaled down node.
private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false;
// How long to wait for a decision
private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
@ -1531,4 +1534,8 @@ public final class ActiveMQDefaultConfiguration {
public static long getDefaultRetryReplicationWait() {
return DEFAULT_RETRY_REPLICATION_WAIT;
}
public static boolean isDefaultCleanupSfQueue() {
return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE;
}
}

View File

@ -277,6 +277,7 @@ public class PacketImpl implements Packet {
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16;
// Static --------------------------------------------------------

View File

@ -125,9 +125,9 @@ public final class ConfigurationUtils {
public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) {
if (scaleDownConfiguration != null) {
if (scaleDownConfiguration.getDiscoveryGroup() != null) {
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
} else {
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
}
}
return null;

View File

@ -34,6 +34,8 @@ public class ScaleDownConfiguration implements Serializable {
private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
private boolean cleanupSfQueue = ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue();
public List<String> getConnectors() {
return connectors;
}
@ -83,4 +85,13 @@ public class ScaleDownConfiguration implements Serializable {
this.enabled = enabled;
return this;
}
public Boolean isCleanupSfQueue() {
return this.cleanupSfQueue;
}
public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) {
this.cleanupSfQueue = cleanupSfQueue;
return this;
}
}

View File

@ -1578,6 +1578,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Element scaleDownElement = (Element) scaleDownNode.item(0);
scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, "cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue()));
scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled()));
NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref");
@ -1791,8 +1793,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
String scaleDownConnector = e.getAttribute("scale-down-connector");
String discoveryGroupName = null;
List<String> staticConnectorNames = new ArrayList<>();

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
@ -76,6 +77,7 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
@ -252,6 +254,10 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
packet = new ScaleDownAnnounceMessage();
break;
}
case SCALEDOWN_ANNOUNCEMENT_V2: {
packet = new ScaleDownAnnounceMessageV2();
break;
}
default: {
packet = super.decode(packetType, connection);
}

View File

@ -22,13 +22,17 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
public class ScaleDownAnnounceMessage extends PacketImpl {
private SimpleString targetNodeId;
private SimpleString scaledDownNodeId;
protected SimpleString targetNodeId;
protected SimpleString scaledDownNodeId;
public ScaleDownAnnounceMessage() {
super(SCALEDOWN_ANNOUNCEMENT);
}
public ScaleDownAnnounceMessage(byte type) {
super(type);
}
public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
super(SCALEDOWN_ANNOUNCEMENT);
this.targetNodeId = targetNodeId;

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.SimpleString;
public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage {
public ScaleDownAnnounceMessageV2() {
super(SCALEDOWN_ANNOUNCEMENT_V2);
}
public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
this();
this.targetNodeId = targetNodeId;
this.scaledDownNodeId = scaledDownNodeId;
}
}

View File

@ -450,4 +450,5 @@ public interface Queue extends Bindable,CriticalComponent {
}
boolean internalDelete();
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
@ -96,4 +97,11 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
* @return
*/
BridgeMetrics getBridgeMetrics(String nodeId);
/**
* Remove the store-and-forward queue after scale down
*/
void removeSfQueue(SimpleString scaledDownNodeId);
void removeSfQueue(Queue queue);
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnoun
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -195,8 +196,10 @@ public class ClusterControl implements AutoCloseable {
return requestBackup(backupRequestMessage);
}
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId, boolean isCleanupSfQueue) {
ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
clusterChannel.send(announceMessage);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -400,12 +401,19 @@ public class ClusterController implements ActiveMQComponent {
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT || packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
//we don't really need to check as it should always be true
if (server.getNodeID().equals(message.getTargetNodeId())) {
server.addScaledDownNode(message.getScaledDownNodeId());
}
if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
ClusterManager clusterManager = ClusterController.this.server.getClusterManager();
Set<ClusterConnection> ccs = clusterManager.getClusterConnections();
for (ClusterConnection cc : ccs) {
cc.removeSfQueue(message.getScaledDownNodeId());
}
}
} else if (channelHandler != null) {
channelHandler.handlePacket(packet);
}

View File

@ -41,21 +41,25 @@ public class ScaleDownPolicy {
private boolean enabled;
private boolean isCleanupSfQueue;
public ScaleDownPolicy() {
}
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) {
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
this.connectors = connectors;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
this.isCleanupSfQueue = isCleanupSfQueue;
}
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) {
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
this.discoveryGroup = discoveryGroup;
this.groupName = groupName;
this.clusterName = clusterName;
this.enabled = enabled;
this.isCleanupSfQueue = isCleanupSfQueue;
}
public List<String> getConnectors() {
@ -124,4 +128,8 @@ public class ScaleDownPolicy {
ActiveMQServer activeMQServer) {
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
}
public boolean isCleanupSfQueue() {
return this.isCleanupSfQueue;
}
}

View File

@ -1113,6 +1113,9 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
protected void postStop() {
}
// Inner classes -------------------------------------------------
@ -1229,6 +1232,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
}
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
postStop();
}
}

View File

@ -382,6 +382,11 @@ public class ClusterConnectionBridge extends BridgeImpl {
super.nodeUP(member, last);
}
@Override
protected void postStop() {
clusterConnection.removeSfQueue(queue);
}
@Override
protected void afterConnect() throws Exception {

View File

@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
// New node - create a new flow record
final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
final SimpleString queueName = getSfQueueName(nodeID);
Binding queueBinding = postOffice.getBinding(queueName);
@ -741,6 +741,10 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
}
}
public SimpleString getSfQueueName(String nodeID) {
return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
}
@Override
public synchronized void informClusterOfBackup() {
String nodeID = server.getNodeID().toString();
@ -770,6 +774,27 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
}
@Override
public void removeSfQueue(SimpleString scaledDownNodeId) {
SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString());
Binding binding = server.getPostOffice().getBinding(sfQName);
if (binding != null) {
removeSfQueue((Queue) binding.getBindable());
}
}
@Override
public void removeSfQueue(Queue queue) {
if (queue.internalDelete()) {
try {
server.removeAddressInfo(queue.getAddress(), null);
} catch (Exception e) {
logger.debug("Failed to remove sf address: " + queue.getAddress(), e);
}
}
}
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -49,6 +50,7 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
private ActiveMQServer parentServer;
private ServerLocator locator;
private final ClusterController clusterController;
private ScaleDownPolicy scaleDownPolicy;
public BackupRecoveryJournalLoader(PostOffice postOffice,
PagingManager pagingManager,
@ -60,12 +62,14 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
Configuration configuration,
ActiveMQServer parentServer,
ServerLocatorInternal locator,
ClusterController clusterController) {
ClusterController clusterController,
ScaleDownPolicy scaleDownPolicy) {
super(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration);
this.parentServer = parentServer;
this.locator = locator;
this.clusterController = clusterController;
this.scaleDownPolicy = scaleDownPolicy;
}
@Override
@ -87,11 +91,12 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
public void postLoad(Journal messageJournal,
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID(), this.scaleDownPolicy);
}
}

View File

@ -179,6 +179,7 @@ public class LiveOnlyActivation extends Activation {
DuplicateIDCache duplicateIDCache = activeMQServer.getPostOffice().getDuplicateIDCache(address);
duplicateIDMap.put(address, duplicateIDCache.getMap());
}
return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null);
return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null, this.liveOnlyPolicy.getScaleDownPolicy());
}
}

View File

@ -313,6 +313,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile long ringSize;
private Boolean removeSf;
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@ -2534,6 +2536,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void setInternalQueue(boolean internalQueue) {
this.internalQueue = internalQueue;
this.removeSf = null;
}
// Public
@ -3461,6 +3464,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
/**
* Delete the store and forward queue
* Only the second caller (if there is one) of this method does the actual deletion.
* The logic makes sure the sf queue is deleted only after bridge is stopped.
*/
@Override
public synchronized boolean internalDelete() {
if (this.isInternalQueue()) {
if (removeSf == null) {
removeSf = false;
} else if (removeSf == false) {
try {
deleteQueue();
removeSf = true;
return true;
} catch (Exception e) {
logger.debug("Error removing sf queue " + getName(), e);
}
}
}
return false;
}
private boolean checkExpired(final MessageReference reference) {
try {
if (reference.getMessage().isExpired()) {

View File

@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@ -91,14 +92,15 @@ public class ScaleDownHandler {
ResourceManager resourceManager,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
SimpleString managementAddress,
SimpleString targetNodeId) throws Exception {
SimpleString targetNodeId,
ScaleDownPolicy scaleDownPolicy) throws Exception {
ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory);
clusterControl.authorize();
long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
ActiveMQServerLogger.LOGGER.infoScaledDownMessages(num);
scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId(), scaleDownPolicy.isCleanupSfQueue());
return num;
}

View File

@ -411,7 +411,7 @@ public final class SharedNothingBackupActivation extends Activation {
Configuration configuration,
ActiveMQServer parentServer) throws ActiveMQException {
if (replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled()) {
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), replicaPolicy.getScaleDownPolicy());
} else {
return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
}

View File

@ -172,7 +172,7 @@ public final class SharedStoreBackupActivation extends Activation {
Configuration configuration,
ActiveMQServer parentServer) throws ActiveMQException {
if (sharedStoreSlavePolicy.getScaleDownPolicy() != null && sharedStoreSlavePolicy.getScaleDownPolicy().isEnabled()) {
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), sharedStoreSlavePolicy.getScaleDownPolicy());
} else {
return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
}

View File

@ -2881,6 +2881,13 @@
</xsd:complexType>
</xsd:element>
</xsd:choice>
<xsd:element name="cleanup-sf-queue" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
Tells the target node whether delete the store and forward queue after scale down.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>

View File

@ -28,7 +28,9 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -272,6 +274,37 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
testParsingOverFlow("<bridges> \n" + " <bridge name=\"price-forward-bridge\"> \n" + " <queue-name>priceForwarding</queue-name> \n" + " <forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + " <producer-window-size>2147483648</producer-window-size>\n" + " <static-connectors> \n" + " <connector-ref>netty</connector-ref> \n" + " </static-connectors> \n" + " </bridge> \n" + "</bridges>\n");
}
@Test
public void testParsingScaleDownConfig() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = firstPart + "<ha-policy>\n" +
" <live-only>\n" +
" <scale-down>\n" +
" <connectors>\n" +
" <connector-ref>server0-connector</connector-ref>\n" +
" </connectors>\n" +
" <cleanup-sf-queue>true</cleanup-sf-queue>\n" +
" </scale-down>\n" +
" </live-only>\n" +
"</ha-policy>\n" + lastPart;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
assertTrue(scaledownCfg.isCleanupSfQueue());
List<String> connectors = scaledownCfg.getConnectors();
assertEquals(1, connectors.size());
String connector = connectors.get(0);
assertEquals("server0-connector", connector);
}
private void testParsingOverFlow(String config) throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String firstPartWithoutAddressSettings = firstPart.substring(0, firstPart.indexOf("<address-settings"));

View File

@ -299,6 +299,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertNotNull(lopc.getScaleDownConfiguration());
assertEquals(lopc.getScaleDownConfiguration().getGroupName(), "boo!");
assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), "dg1");
assertFalse(lopc.getScaleDownConfiguration().isCleanupSfQueue());
for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) {
if (ccc.getName().equals("cluster-connection1")) {

View File

@ -888,6 +888,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public void recheckRefCount(OperationContext context) {
}
@Override
public boolean internalDelete() {
return false;
}
@Override
public void unproposed(SimpleString groupID) {

View File

@ -695,6 +695,30 @@ transactions are there for the client when it reconnects. The normal
reconnect settings apply when the client is reconnecting so these should
be high enough to deal with the time needed to scale down.
#### Automatic Deleting Store-and-Forward Queue after Scale Down
By default after the node is scaled down to a target node the internal
SF queue is not deleted. There is a boolean configuration parameter called
"cleanup-sf-queue" that can be used in case you want to delete it.
To do so you need to add this parameter to the scale-down policy and
set it to "true". For example:
```xml
<ha-policy>
<live-only>
<scale-down>
...
<cleanup-sf-queue>true</cleanup-sf-queue>
</scale-down>
</live-only>
</ha-policy>
```
With the above config in place when the scale down node is
stopped, it will send a message to the target node once the scale down
is complete. The target node will then properly delete the SF queue and its address.
## Failover Modes
Apache ActiveMQ Artemis defines two types of client failover:

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.Collection;
@RunWith(value = Parameterized.class)
public class ScaleDownRemoveSFTest extends ClusterTestBase {
@Parameterized.Parameters(name = "RemoveOption={0}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"default"}, {"true"}, {"false"}});
}
public ScaleDownRemoveSFTest(String option) {
this.option = option;
}
private String option;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
if (!"default".equals(option)) {
scaleDownConfiguration.setCleanupSfQueue("true".equals(this.option));
}
setupLiveServer(0, isFileStorage(), isNetty(), true);
setupLiveServer(1, isFileStorage(), isNetty(), true);
LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
haPolicyConfiguration1.setScaleDownConfiguration(new ScaleDownConfiguration());
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
startServers(0, 1);
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
}
protected boolean isNetty() {
return true;
}
@Test
public void testScaleDownCheckSF() throws Exception {
final int TEST_SIZE = 2;
final String addressName = "testAddress";
final String queueName1 = "testQueue1";
// create 2 queues on each node mapped to the same address
createQueue(0, addressName, queueName1, null, true);
createQueue(1, addressName, queueName1, null, true);
// send messages to node 0
send(0, addressName, TEST_SIZE, true, null);
// consume a message from queue 1
addConsumer(1, 0, queueName1, null, false);
ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
consumers[1].getSession().commit();
Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
//check sf queue on server1 exists
ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster0");
SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
System.out.println("[sf queue on server 1]: " + sfQueueName);
QueueQueryResult result = servers[1].queueQuery(sfQueueName);
assertTrue(result.isExists());
// trigger scaleDown from node 0 to node 1
servers[0].stop();
addConsumer(0, 1, queueName1, null);
clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNotNull(clientMessage);
clientMessage.acknowledge();
// ensure there are no more messages on queue 1
clientMessage = consumers[0].getConsumer().receive(250);
Assert.assertNull(clientMessage);
removeConsumer(0);
//check
result = servers[1].queueQuery(sfQueueName);
AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
if ("true".equals(option)) {
assertFalse(result.isExists());
assertFalse(result2.isExists());
} else {
assertTrue(result.isExists());
assertTrue(result2.isExists());
}
}
}

View File

@ -196,6 +196,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public boolean internalDelete() {
return false;
}
@Override
public boolean isPersistedPause() {
return false;