Revert "ARTEMIS-2462 Allow store-forward queue to be deleted afte scaledown"
This reverts commit 397cef699a
.
This commit is contained in:
parent
3cbd5a3c05
commit
d55ec37195
|
@ -430,9 +430,6 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// its possible that you only want a server to partake in scale down as a receiver, via a group. In this case set scale-down to false
|
||||
private static boolean DEFAULT_SCALE_DOWN_ENABLED = true;
|
||||
|
||||
// will the target node delete the store-and-forward queue for the scaled down node.
|
||||
private static boolean DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE = false;
|
||||
|
||||
// How long to wait for a decision
|
||||
private static int DEFAULT_GROUPING_HANDLER_TIMEOUT = 5000;
|
||||
|
||||
|
@ -1534,8 +1531,4 @@ public final class ActiveMQDefaultConfiguration {
|
|||
public static long getDefaultRetryReplicationWait() {
|
||||
return DEFAULT_RETRY_REPLICATION_WAIT;
|
||||
}
|
||||
|
||||
public static boolean isDefaultCleanupSfQueue() {
|
||||
return DEFAULT_SCALE_DOWN_CLEANUP_SF_QUEUE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,7 +277,6 @@ public class PacketImpl implements Packet {
|
|||
|
||||
public static final byte SESS_BINDINGQUERY_RESP_V4 = -15;
|
||||
|
||||
public static final byte SCALEDOWN_ANNOUNCEMENT_V2 = -16;
|
||||
|
||||
// Static --------------------------------------------------------
|
||||
|
||||
|
|
|
@ -125,9 +125,9 @@ public final class ConfigurationUtils {
|
|||
public static ScaleDownPolicy getScaleDownPolicy(ScaleDownConfiguration scaleDownConfiguration) {
|
||||
if (scaleDownConfiguration != null) {
|
||||
if (scaleDownConfiguration.getDiscoveryGroup() != null) {
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getDiscoveryGroup(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
|
||||
} else {
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled(), scaleDownConfiguration.isCleanupSfQueue());
|
||||
return new ScaleDownPolicy(scaleDownConfiguration.getConnectors(), scaleDownConfiguration.getGroupName(), scaleDownConfiguration.getClusterName(), scaleDownConfiguration.isEnabled());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
|
|
@ -34,8 +34,6 @@ public class ScaleDownConfiguration implements Serializable {
|
|||
|
||||
private boolean enabled = ActiveMQDefaultConfiguration.isDefaultScaleDownEnabled();
|
||||
|
||||
private boolean cleanupSfQueue = ActiveMQDefaultConfiguration.isDefaultCleanupSfQueue();
|
||||
|
||||
public List<String> getConnectors() {
|
||||
return connectors;
|
||||
}
|
||||
|
@ -85,13 +83,4 @@ public class ScaleDownConfiguration implements Serializable {
|
|||
this.enabled = enabled;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Boolean isCleanupSfQueue() {
|
||||
return this.cleanupSfQueue;
|
||||
}
|
||||
|
||||
public ScaleDownConfiguration setCleanupSfQueue(Boolean cleanupSfQueue) {
|
||||
this.cleanupSfQueue = cleanupSfQueue;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1579,8 +1579,6 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
Element scaleDownElement = (Element) scaleDownNode.item(0);
|
||||
|
||||
scaleDownConfiguration.setCleanupSfQueue(getBoolean(scaleDownElement, "cleanup-sf-queue", scaleDownConfiguration.isCleanupSfQueue()));
|
||||
|
||||
scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, "enabled", scaleDownConfiguration.isEnabled()));
|
||||
|
||||
NodeList discoveryGroupRef = scaleDownElement.getElementsByTagName("discovery-group-ref");
|
||||
|
@ -1794,6 +1792,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
int clusterNotificationAttempts = getInteger(e, "notification-attempts", ActiveMQDefaultConfiguration.getDefaultClusterNotificationAttempts(), Validators.GT_ZERO);
|
||||
|
||||
String scaleDownConnector = e.getAttribute("scale-down-connector");
|
||||
|
||||
String discoveryGroupName = null;
|
||||
|
||||
List<String> staticConnectorNames = new ArrayList<>();
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
|
||||
|
@ -77,7 +76,6 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REP
|
|||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.REPLICATION_RESPONSE_V2;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_ACKNOWLEDGE;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_FLOWTOKEN;
|
||||
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
|
||||
|
@ -254,10 +252,6 @@ public class ServerPacketDecoder extends ClientPacketDecoder {
|
|||
packet = new ScaleDownAnnounceMessage();
|
||||
break;
|
||||
}
|
||||
case SCALEDOWN_ANNOUNCEMENT_V2: {
|
||||
packet = new ScaleDownAnnounceMessageV2();
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
packet = super.decode(packetType, connection);
|
||||
}
|
||||
|
|
|
@ -22,17 +22,13 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
|||
|
||||
public class ScaleDownAnnounceMessage extends PacketImpl {
|
||||
|
||||
protected SimpleString targetNodeId;
|
||||
protected SimpleString scaledDownNodeId;
|
||||
private SimpleString targetNodeId;
|
||||
private SimpleString scaledDownNodeId;
|
||||
|
||||
public ScaleDownAnnounceMessage() {
|
||||
super(SCALEDOWN_ANNOUNCEMENT);
|
||||
}
|
||||
|
||||
public ScaleDownAnnounceMessage(byte type) {
|
||||
super(type);
|
||||
}
|
||||
|
||||
public ScaleDownAnnounceMessage(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
|
||||
super(SCALEDOWN_ANNOUNCEMENT);
|
||||
this.targetNodeId = targetNodeId;
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
||||
public class ScaleDownAnnounceMessageV2 extends ScaleDownAnnounceMessage {
|
||||
|
||||
public ScaleDownAnnounceMessageV2() {
|
||||
super(SCALEDOWN_ANNOUNCEMENT_V2);
|
||||
}
|
||||
|
||||
public ScaleDownAnnounceMessageV2(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
|
||||
this();
|
||||
this.targetNodeId = targetNodeId;
|
||||
this.scaledDownNodeId = scaledDownNodeId;
|
||||
}
|
||||
}
|
|
@ -454,5 +454,4 @@ public interface Queue extends Bindable,CriticalComponent {
|
|||
|
||||
}
|
||||
|
||||
boolean internalDelete();
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
|||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
|
||||
|
||||
|
@ -97,11 +96,4 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
|
|||
* @return
|
||||
*/
|
||||
BridgeMetrics getBridgeMetrics(String nodeId);
|
||||
|
||||
/**
|
||||
* Remove the store-and-forward queue after scale down
|
||||
*/
|
||||
void removeSfQueue(SimpleString scaledDownNodeId);
|
||||
|
||||
void removeSfQueue(Queue queue);
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnoun
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessageV2;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
|
@ -196,10 +195,8 @@ public class ClusterControl implements AutoCloseable {
|
|||
return requestBackup(backupRequestMessage);
|
||||
}
|
||||
|
||||
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId, boolean isCleanupSfQueue) {
|
||||
|
||||
ScaleDownAnnounceMessage announceMessage = isCleanupSfQueue ? new ScaleDownAnnounceMessageV2(targetNodeId, scaledDownNodeId) : new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
|
||||
|
||||
public void announceScaleDown(SimpleString targetNodeId, SimpleString scaledDownNodeId) {
|
||||
ScaleDownAnnounceMessage announceMessage = new ScaleDownAnnounceMessage(targetNodeId, scaledDownNodeId);
|
||||
clusterChannel.send(announceMessage);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.cluster;
|
|||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -401,19 +400,12 @@ public class ClusterController implements ActiveMQComponent {
|
|||
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
||||
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
|
||||
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT || packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
|
||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
||||
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
||||
//we don't really need to check as it should always be true
|
||||
if (server.getNodeID().equals(message.getTargetNodeId())) {
|
||||
server.addScaledDownNode(message.getScaledDownNodeId());
|
||||
}
|
||||
if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT_V2) {
|
||||
ClusterManager clusterManager = ClusterController.this.server.getClusterManager();
|
||||
Set<ClusterConnection> ccs = clusterManager.getClusterConnections();
|
||||
for (ClusterConnection cc : ccs) {
|
||||
cc.removeSfQueue(message.getScaledDownNodeId());
|
||||
}
|
||||
}
|
||||
} else if (channelHandler != null) {
|
||||
channelHandler.handlePacket(packet);
|
||||
}
|
||||
|
|
|
@ -41,25 +41,21 @@ public class ScaleDownPolicy {
|
|||
|
||||
private boolean enabled;
|
||||
|
||||
private boolean isCleanupSfQueue;
|
||||
|
||||
public ScaleDownPolicy() {
|
||||
}
|
||||
|
||||
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
|
||||
public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) {
|
||||
this.connectors = connectors;
|
||||
this.groupName = groupName;
|
||||
this.clusterName = clusterName;
|
||||
this.enabled = enabled;
|
||||
this.isCleanupSfQueue = isCleanupSfQueue;
|
||||
}
|
||||
|
||||
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled, boolean isCleanupSfQueue) {
|
||||
public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) {
|
||||
this.discoveryGroup = discoveryGroup;
|
||||
this.groupName = groupName;
|
||||
this.clusterName = clusterName;
|
||||
this.enabled = enabled;
|
||||
this.isCleanupSfQueue = isCleanupSfQueue;
|
||||
}
|
||||
|
||||
public List<String> getConnectors() {
|
||||
|
@ -128,8 +124,4 @@ public class ScaleDownPolicy {
|
|||
ActiveMQServer activeMQServer) {
|
||||
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
|
||||
}
|
||||
|
||||
public boolean isCleanupSfQueue() {
|
||||
return this.isCleanupSfQueue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1113,9 +1113,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
|
||||
}
|
||||
|
||||
protected void postStop() {
|
||||
}
|
||||
|
||||
|
||||
// Inner classes -------------------------------------------------
|
||||
|
||||
|
@ -1232,7 +1229,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
logger.trace("Removing consumer on stopRunnable " + this + " from queue " + queue);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.bridgeStopped(name);
|
||||
postStop();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -382,11 +382,6 @@ public class ClusterConnectionBridge extends BridgeImpl {
|
|||
super.nodeUP(member, last);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void postStop() {
|
||||
clusterConnection.removeSfQueue(queue);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void afterConnect() throws Exception {
|
||||
|
|
|
@ -710,7 +710,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
|
||||
// New node - create a new flow record
|
||||
|
||||
final SimpleString queueName = getSfQueueName(nodeID);
|
||||
final SimpleString queueName = new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
|
||||
|
||||
Binding queueBinding = postOffice.getBinding(queueName);
|
||||
|
||||
|
@ -741,10 +741,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
}
|
||||
}
|
||||
|
||||
public SimpleString getSfQueueName(String nodeID) {
|
||||
return new SimpleString(storeAndForwardPrefix + name + "." + nodeID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void informClusterOfBackup() {
|
||||
String nodeID = server.getNodeID().toString();
|
||||
|
@ -774,27 +770,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSfQueue(SimpleString scaledDownNodeId) {
|
||||
SimpleString sfQName = getSfQueueName(scaledDownNodeId.toString());
|
||||
Binding binding = server.getPostOffice().getBinding(sfQName);
|
||||
|
||||
if (binding != null) {
|
||||
removeSfQueue((Queue) binding.getBindable());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeSfQueue(Queue queue) {
|
||||
if (queue.internalDelete()) {
|
||||
try {
|
||||
server.removeAddressInfo(queue.getAddress(), null);
|
||||
} catch (Exception e) {
|
||||
logger.debug("Failed to remove sf address: " + queue.getAddress(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewRecord(final long eventUID,
|
||||
final String targetNodeID,
|
||||
final TransportConfiguration connector,
|
||||
|
|
|
@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
|
|||
import org.apache.activemq.artemis.core.server.QueueFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
|
||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||
import org.apache.activemq.artemis.core.server.management.ManagementService;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
|
@ -50,7 +49,6 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
|
|||
private ActiveMQServer parentServer;
|
||||
private ServerLocator locator;
|
||||
private final ClusterController clusterController;
|
||||
private ScaleDownPolicy scaleDownPolicy;
|
||||
|
||||
public BackupRecoveryJournalLoader(PostOffice postOffice,
|
||||
PagingManager pagingManager,
|
||||
|
@ -62,14 +60,12 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
|
|||
Configuration configuration,
|
||||
ActiveMQServer parentServer,
|
||||
ServerLocatorInternal locator,
|
||||
ClusterController clusterController,
|
||||
ScaleDownPolicy scaleDownPolicy) {
|
||||
ClusterController clusterController) {
|
||||
|
||||
super(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration);
|
||||
this.parentServer = parentServer;
|
||||
this.locator = locator;
|
||||
this.clusterController = clusterController;
|
||||
this.scaleDownPolicy = scaleDownPolicy;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,12 +87,11 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader {
|
|||
public void postLoad(Journal messageJournal,
|
||||
ResourceManager resourceManager,
|
||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {
|
||||
|
||||
ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController, parentServer.getStorageManager());
|
||||
locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
|
||||
|
||||
try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) {
|
||||
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID(), this.scaleDownPolicy);
|
||||
scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -179,7 +179,6 @@ public class LiveOnlyActivation extends Activation {
|
|||
DuplicateIDCache duplicateIDCache = activeMQServer.getPostOffice().getDuplicateIDCache(address);
|
||||
duplicateIDMap.put(address, duplicateIDCache.getMap());
|
||||
}
|
||||
|
||||
return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null, this.liveOnlyPolicy.getScaleDownPolicy());
|
||||
return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, activeMQServer.getResourceManager(), duplicateIDMap, activeMQServer.getManagementService().getManagementAddress(), null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,8 +315,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
private volatile long ringSize;
|
||||
|
||||
private Boolean removeSf;
|
||||
|
||||
/**
|
||||
* This is to avoid multi-thread races on calculating direct delivery,
|
||||
* to guarantee ordering will be always be correct
|
||||
|
@ -2557,7 +2555,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
@Override
|
||||
public void setInternalQueue(boolean internalQueue) {
|
||||
this.internalQueue = internalQueue;
|
||||
this.removeSf = null;
|
||||
}
|
||||
|
||||
// Public
|
||||
|
@ -3487,29 +3484,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the store and forward queue
|
||||
* Only the second caller (if there is one) of this method does the actual deletion.
|
||||
* The logic makes sure the sf queue is deleted only after bridge is stopped.
|
||||
*/
|
||||
@Override
|
||||
public synchronized boolean internalDelete() {
|
||||
if (this.isInternalQueue()) {
|
||||
if (removeSf == null) {
|
||||
removeSf = false;
|
||||
} else if (removeSf == false) {
|
||||
try {
|
||||
deleteQueue();
|
||||
removeSf = true;
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
logger.debug("Error removing sf queue " + getName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean checkExpired(final MessageReference reference) {
|
||||
try {
|
||||
if (reference.getMessage().isExpired()) {
|
||||
|
|
|
@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
|
|||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
|
||||
import org.apache.activemq.artemis.core.transaction.ResourceManager;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
|
||||
|
@ -92,15 +91,14 @@ public class ScaleDownHandler {
|
|||
ResourceManager resourceManager,
|
||||
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap,
|
||||
SimpleString managementAddress,
|
||||
SimpleString targetNodeId,
|
||||
ScaleDownPolicy scaleDownPolicy) throws Exception {
|
||||
SimpleString targetNodeId) throws Exception {
|
||||
ClusterControl clusterControl = clusterController.connectToNodeInCluster((ClientSessionFactoryInternal) sessionFactory);
|
||||
clusterControl.authorize();
|
||||
long num = scaleDownMessages(sessionFactory, targetNodeId, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
|
||||
ActiveMQServerLogger.LOGGER.infoScaledDownMessages(num);
|
||||
scaleDownTransactions(sessionFactory, resourceManager, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
|
||||
scaleDownDuplicateIDs(duplicateIDMap, sessionFactory, managementAddress, clusterControl.getClusterUser(), clusterControl.getClusterPassword());
|
||||
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId(), scaleDownPolicy.isCleanupSfQueue());
|
||||
clusterControl.announceScaleDown(new SimpleString(this.targetNodeId), nodeManager.getNodeId());
|
||||
return num;
|
||||
}
|
||||
|
||||
|
|
|
@ -411,7 +411,7 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
Configuration configuration,
|
||||
ActiveMQServer parentServer) throws ActiveMQException {
|
||||
if (replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled()) {
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), replicaPolicy.getScaleDownPolicy());
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(replicaPolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
|
||||
} else {
|
||||
return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
|
||||
}
|
||||
|
|
|
@ -172,7 +172,7 @@ public final class SharedStoreBackupActivation extends Activation {
|
|||
Configuration configuration,
|
||||
ActiveMQServer parentServer) throws ActiveMQException {
|
||||
if (sharedStoreSlavePolicy.getScaleDownPolicy() != null && sharedStoreSlavePolicy.getScaleDownPolicy().isEnabled()) {
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController(), sharedStoreSlavePolicy.getScaleDownPolicy());
|
||||
return new BackupRecoveryJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer, ScaleDownPolicy.getScaleDownConnector(sharedStoreSlavePolicy.getScaleDownPolicy(), activeMQServer), activeMQServer.getClusterManager().getClusterController());
|
||||
} else {
|
||||
return super.createJournalLoader(postOffice, pagingManager, storageManager, queueFactory, nodeManager, managementService, groupingHandler, configuration, parentServer);
|
||||
}
|
||||
|
|
|
@ -2881,13 +2881,6 @@
|
|||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
</xsd:choice>
|
||||
<xsd:element name="cleanup-sf-queue" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Tells the target node whether delete the store and forward queue after scale down.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:sequence>
|
||||
<xsd:attributeGroup ref="xml:specialAttrs"/>
|
||||
</xsd:complexType>
|
||||
|
|
|
@ -28,9 +28,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
|
@ -274,37 +272,6 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
|
|||
testParsingOverFlow("<bridges> \n" + " <bridge name=\"price-forward-bridge\"> \n" + " <queue-name>priceForwarding</queue-name> \n" + " <forwarding-address>newYorkPriceUpdates</forwarding-address>\n" + " <producer-window-size>2147483648</producer-window-size>\n" + " <static-connectors> \n" + " <connector-ref>netty</connector-ref> \n" + " </static-connectors> \n" + " </bridge> \n" + "</bridges>\n");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParsingScaleDownConfig() throws Exception {
|
||||
FileConfigurationParser parser = new FileConfigurationParser();
|
||||
|
||||
String configStr = firstPart + "<ha-policy>\n" +
|
||||
" <live-only>\n" +
|
||||
" <scale-down>\n" +
|
||||
" <connectors>\n" +
|
||||
" <connector-ref>server0-connector</connector-ref>\n" +
|
||||
" </connectors>\n" +
|
||||
" <cleanup-sf-queue>true</cleanup-sf-queue>\n" +
|
||||
" </scale-down>\n" +
|
||||
" </live-only>\n" +
|
||||
"</ha-policy>\n" + lastPart;
|
||||
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
Configuration config = parser.parseMainConfig(input);
|
||||
|
||||
HAPolicyConfiguration haConfig = config.getHAPolicyConfiguration();
|
||||
assertTrue(haConfig instanceof LiveOnlyPolicyConfiguration);
|
||||
|
||||
LiveOnlyPolicyConfiguration liveOnlyCfg = (LiveOnlyPolicyConfiguration) haConfig;
|
||||
ScaleDownConfiguration scaledownCfg = liveOnlyCfg.getScaleDownConfiguration();
|
||||
assertTrue(scaledownCfg.isCleanupSfQueue());
|
||||
List<String> connectors = scaledownCfg.getConnectors();
|
||||
assertEquals(1, connectors.size());
|
||||
String connector = connectors.get(0);
|
||||
assertEquals("server0-connector", connector);
|
||||
}
|
||||
|
||||
|
||||
private void testParsingOverFlow(String config) throws Exception {
|
||||
FileConfigurationParser parser = new FileConfigurationParser();
|
||||
String firstPartWithoutAddressSettings = firstPart.substring(0, firstPart.indexOf("<address-settings"));
|
||||
|
|
|
@ -299,7 +299,6 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertNotNull(lopc.getScaleDownConfiguration());
|
||||
assertEquals(lopc.getScaleDownConfiguration().getGroupName(), "boo!");
|
||||
assertEquals(lopc.getScaleDownConfiguration().getDiscoveryGroup(), "dg1");
|
||||
assertFalse(lopc.getScaleDownConfiguration().isCleanupSfQueue());
|
||||
|
||||
for (ClusterConnectionConfiguration ccc : conf.getClusterConfigurations()) {
|
||||
if (ccc.getName().equals("cluster-connection1")) {
|
||||
|
|
|
@ -888,11 +888,6 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
|||
public void recheckRefCount(OperationContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean internalDelete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unproposed(SimpleString groupID) {
|
||||
|
||||
|
|
|
@ -695,30 +695,6 @@ transactions are there for the client when it reconnects. The normal
|
|||
reconnect settings apply when the client is reconnecting so these should
|
||||
be high enough to deal with the time needed to scale down.
|
||||
|
||||
#### Automatic Deleting Store-and-Forward Queue after Scale Down
|
||||
|
||||
By default after the node is scaled down to a target node the internal
|
||||
SF queue is not deleted. There is a boolean configuration parameter called
|
||||
"cleanup-sf-queue" that can be used in case you want to delete it.
|
||||
|
||||
To do so you need to add this parameter to the scale-down policy and
|
||||
set it to "true". For example:
|
||||
|
||||
```xml
|
||||
<ha-policy>
|
||||
<live-only>
|
||||
<scale-down>
|
||||
...
|
||||
<cleanup-sf-queue>true</cleanup-sf-queue>
|
||||
</scale-down>
|
||||
</live-only>
|
||||
</ha-policy>
|
||||
```
|
||||
|
||||
With the above config in place when the scale down node is
|
||||
stopped, it will send a message to the target node once the scale down
|
||||
is complete. The target node will then properly delete the SF queue and its address.
|
||||
|
||||
## Failover Modes
|
||||
|
||||
Apache ActiveMQ Artemis defines two types of client failover:
|
||||
|
|
|
@ -1,148 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.server;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.AddressQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class ScaleDownRemoveSFTest extends ClusterTestBase {
|
||||
|
||||
@Parameterized.Parameters(name = "RemoveOption={0}")
|
||||
public static Collection getParameters() {
|
||||
return Arrays.asList(new Object[][]{{"default"}, {"true"}, {"false"}});
|
||||
}
|
||||
|
||||
public ScaleDownRemoveSFTest(String option) {
|
||||
this.option = option;
|
||||
}
|
||||
|
||||
private String option;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
|
||||
if (!"default".equals(option)) {
|
||||
scaleDownConfiguration.setCleanupSfQueue("true".equals(this.option));
|
||||
}
|
||||
setupLiveServer(0, isFileStorage(), isNetty(), true);
|
||||
setupLiveServer(1, isFileStorage(), isNetty(), true);
|
||||
LiveOnlyPolicyConfiguration haPolicyConfiguration0 = (LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration();
|
||||
haPolicyConfiguration0.setScaleDownConfiguration(scaleDownConfiguration);
|
||||
LiveOnlyPolicyConfiguration haPolicyConfiguration1 = (LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration();
|
||||
haPolicyConfiguration1.setScaleDownConfiguration(new ScaleDownConfiguration());
|
||||
|
||||
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
|
||||
setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
|
||||
haPolicyConfiguration0.getScaleDownConfiguration().getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
||||
haPolicyConfiguration1.getScaleDownConfiguration().getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors());
|
||||
servers[0].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
|
||||
servers[1].getConfiguration().getAddressesSettings().put("#", new AddressSettings().setRedistributionDelay(0));
|
||||
startServers(0, 1);
|
||||
setupSessionFactory(0, isNetty());
|
||||
setupSessionFactory(1, isNetty());
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
|
||||
protected boolean isNetty() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScaleDownCheckSF() throws Exception {
|
||||
final int TEST_SIZE = 2;
|
||||
final String addressName = "testAddress";
|
||||
final String queueName1 = "testQueue1";
|
||||
|
||||
// create 2 queues on each node mapped to the same address
|
||||
createQueue(0, addressName, queueName1, null, true);
|
||||
createQueue(1, addressName, queueName1, null, true);
|
||||
|
||||
// send messages to node 0
|
||||
send(0, addressName, TEST_SIZE, true, null);
|
||||
|
||||
// consume a message from queue 1
|
||||
addConsumer(1, 0, queueName1, null, false);
|
||||
ClientMessage clientMessage = consumers[1].getConsumer().receive(250);
|
||||
Assert.assertNotNull(clientMessage);
|
||||
clientMessage.acknowledge();
|
||||
consumers[1].getSession().commit();
|
||||
|
||||
Assert.assertEquals(TEST_SIZE - 1, getMessageCount(((LocalQueueBinding) servers[0].getPostOffice().getBinding(new SimpleString(queueName1))).getQueue()));
|
||||
|
||||
//check sf queue on server1 exists
|
||||
ClusterConnectionImpl clusterconn1 = (ClusterConnectionImpl) servers[1].getClusterManager().getClusterConnection("cluster0");
|
||||
SimpleString sfQueueName = clusterconn1.getSfQueueName(servers[0].getNodeID().toString());
|
||||
|
||||
System.out.println("[sf queue on server 1]: " + sfQueueName);
|
||||
QueueQueryResult result = servers[1].queueQuery(sfQueueName);
|
||||
assertTrue(result.isExists());
|
||||
|
||||
// trigger scaleDown from node 0 to node 1
|
||||
servers[0].stop();
|
||||
|
||||
addConsumer(0, 1, queueName1, null);
|
||||
clientMessage = consumers[0].getConsumer().receive(250);
|
||||
Assert.assertNotNull(clientMessage);
|
||||
clientMessage.acknowledge();
|
||||
|
||||
// ensure there are no more messages on queue 1
|
||||
clientMessage = consumers[0].getConsumer().receive(250);
|
||||
Assert.assertNull(clientMessage);
|
||||
removeConsumer(0);
|
||||
|
||||
//check
|
||||
result = servers[1].queueQuery(sfQueueName);
|
||||
AddressQueryResult result2 = servers[1].addressQuery(sfQueueName);
|
||||
if ("true".equals(option)) {
|
||||
assertFalse(result.isExists());
|
||||
assertFalse(result2.isExists());
|
||||
} else {
|
||||
assertTrue(result.isExists());
|
||||
assertTrue(result2.isExists());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -196,11 +196,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean internalDelete() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPersistedPause() {
|
||||
return false;
|
||||
|
|
Loading…
Reference in New Issue