This closes #3094
This commit is contained in:
commit
16f17853ff
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.server.cluster;
|
package org.apache.activemq.artemis.core.server.cluster;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
@ -32,14 +34,10 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupResp
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
|
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* handles the communication between a cluster node and the cluster, either the whole cluster or a specific node in the
|
* handles the communication between a cluster node and the cluster, either the whole cluster or a specific node in the
|
||||||
|
@ -64,6 +62,10 @@ public class ClusterControl implements AutoCloseable {
|
||||||
this.clusterPassword = server.getConfiguration().getClusterPassword();
|
this.clusterPassword = server.getConfiguration().getClusterPassword();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Optional<Channel> getClusterChannel() {
|
||||||
|
return Optional.ofNullable(clusterChannel);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* authorise this cluster control so it can communicate with the cluster, it will set the cluster channel on a successful
|
* authorise this cluster control so it can communicate with the cluster, it will set the cluster channel on a successful
|
||||||
* authentication.
|
* authentication.
|
||||||
|
@ -157,20 +159,6 @@ public class ClusterControl implements AutoCloseable {
|
||||||
sessionFactory.close();
|
sessionFactory.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Vote sendQuorumVote(SimpleString handler, Vote vote) {
|
|
||||||
try {
|
|
||||||
ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(getSessionFactory().getConnection().getRemoteAddress(), vote.toString());
|
|
||||||
QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterChannel.sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY);
|
|
||||||
QuorumVoteHandler voteHandler = server.getClusterManager().getQuorumManager().getVoteHandler(replyMessage.getHandler());
|
|
||||||
replyMessage.decodeRest(voteHandler);
|
|
||||||
Vote voteResponse = replyMessage.getVote();
|
|
||||||
ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(getSessionFactory().getConnection().getRemoteAddress(), voteResponse.toString());
|
|
||||||
return voteResponse;
|
|
||||||
} catch (ActiveMQException e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean requestReplicatedBackup(int backupSize, SimpleString nodeID) {
|
public boolean requestReplicatedBackup(int backupSize, SimpleString nodeID) {
|
||||||
BackupRequestMessage backupRequestMessage = new BackupRequestMessage(backupSize, nodeID);
|
BackupRequestMessage backupRequestMessage = new BackupRequestMessage(backupSize, nodeID);
|
||||||
return requestBackup(backupRequestMessage);
|
return requestBackup(backupRequestMessage);
|
||||||
|
|
|
@ -45,15 +45,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler;
|
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.Vote;
|
|
||||||
import org.apache.activemq.artemis.core.server.impl.Activation;
|
import org.apache.activemq.artemis.core.server.impl.Activation;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
@ -396,17 +392,7 @@ public class ClusterController implements ActiveMQComponent {
|
||||||
logger.debug("there is no acceptor used configured at the CoreProtocolManager " + this);
|
logger.debug("there is no acceptor used configured at the CoreProtocolManager " + this);
|
||||||
}
|
}
|
||||||
} else if (packet.getType() == PacketImpl.QUORUM_VOTE) {
|
} else if (packet.getType() == PacketImpl.QUORUM_VOTE) {
|
||||||
QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
|
quorumManager.handleQuorumVote(clusterChannel, packet);
|
||||||
QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler());
|
|
||||||
if (voteHandler == null) {
|
|
||||||
ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
quorumVoteMessage.decode(voteHandler);
|
|
||||||
ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());
|
|
||||||
Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
|
||||||
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
|
|
||||||
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
|
||||||
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {
|
||||||
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;
|
||||||
//we don't really need to check as it should always be true
|
//we don't really need to check as it should always be true
|
||||||
|
|
|
@ -20,14 +20,23 @@ import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
||||||
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteMessage;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.QuorumVoteReplyMessage;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
|
||||||
|
@ -162,6 +171,48 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean hasLive(String nodeID, int quorumSize, int voteTimeout, TimeUnit voteTimeoutUnit) {
|
||||||
|
Objects.requireNonNull(nodeID, "nodeID");
|
||||||
|
if (!started) {
|
||||||
|
throw new IllegalStateException("QuorumManager must start first");
|
||||||
|
}
|
||||||
|
int size = quorumSize == -1 ? maxClusterSize : quorumSize;
|
||||||
|
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, nodeID);
|
||||||
|
// A positive decision means that there is no live with nodeID
|
||||||
|
boolean noLive = awaitVoteComplete(quorumVote, voteTimeout, voteTimeoutUnit);
|
||||||
|
return !noLive;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isStillLive(String nodeID,
|
||||||
|
TransportConfiguration liveConnector,
|
||||||
|
int quorumSize,
|
||||||
|
int voteTimeout,
|
||||||
|
TimeUnit voteTimeoutUnit) {
|
||||||
|
Objects.requireNonNull(nodeID, "nodeID");
|
||||||
|
Objects.requireNonNull(nodeID, "liveConnector");
|
||||||
|
if (!started) {
|
||||||
|
throw new IllegalStateException("QuorumManager must start first");
|
||||||
|
}
|
||||||
|
int size = quorumSize == -1 ? maxClusterSize : quorumSize;
|
||||||
|
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, nodeID, true, liveConnector.toString());
|
||||||
|
return awaitVoteComplete(quorumVote, voteTimeout, voteTimeoutUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean awaitVoteComplete(QuorumVoteServerConnect quorumVote, int voteTimeout, TimeUnit voteTimeoutUnit) {
|
||||||
|
vote(quorumVote);
|
||||||
|
|
||||||
|
try {
|
||||||
|
quorumVote.await(voteTimeout, voteTimeoutUnit);
|
||||||
|
} catch (InterruptedException interruption) {
|
||||||
|
// No-op. The best the quorum can do now is to return the latest number it has
|
||||||
|
ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted();
|
||||||
|
}
|
||||||
|
|
||||||
|
voteComplete(quorumVote);
|
||||||
|
|
||||||
|
return quorumVote.getDecision();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* returns the maximum size this cluster has been.
|
* returns the maximum size this cluster has been.
|
||||||
*
|
*
|
||||||
|
@ -214,7 +265,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
* @param vote the vote
|
* @param vote the vote
|
||||||
* @return the updated vote
|
* @return the updated vote
|
||||||
*/
|
*/
|
||||||
public Vote vote(SimpleString handler, Vote vote) {
|
private Vote vote(SimpleString handler, Vote vote) {
|
||||||
QuorumVoteHandler quorumVoteHandler = handlers.get(handler);
|
QuorumVoteHandler quorumVoteHandler = handlers.get(handler);
|
||||||
return quorumVoteHandler.vote(vote);
|
return quorumVoteHandler.vote(vote);
|
||||||
}
|
}
|
||||||
|
@ -225,7 +276,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
*
|
*
|
||||||
* @param quorumVote the vote
|
* @param quorumVote the vote
|
||||||
*/
|
*/
|
||||||
public void voteComplete(QuorumVoteServerConnect quorumVote) {
|
private void voteComplete(QuorumVoteServerConnect quorumVote) {
|
||||||
VoteRunnableHolder holder = voteRunnables.remove(quorumVote);
|
VoteRunnableHolder holder = voteRunnables.remove(quorumVote);
|
||||||
if (holder != null) {
|
if (holder != null) {
|
||||||
for (VoteRunnable runnable : holder.runnables) {
|
for (VoteRunnable runnable : holder.runnables) {
|
||||||
|
@ -248,25 +299,23 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
return QuorumManager.class.getSimpleName() + "(server=" + clusterController.getIdentity() + ")";
|
return QuorumManager.class.getSimpleName() + "(server=" + clusterController.getIdentity() + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
public QuorumVoteHandler getVoteHandler(SimpleString handler) {
|
private QuorumVoteHandler getVoteHandler(SimpleString handler) {
|
||||||
return handlers.get(handler);
|
return handlers.get(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TransportConfiguration getLiveTransportConfiguration(String targetServerID) {
|
public void handleQuorumVote(Channel clusterChannel, Packet packet) {
|
||||||
TopologyMemberImpl member = clusterController.getDefaultClusterTopology().getMember(targetServerID);
|
QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
|
||||||
return member != null ? member.getLive() : null;
|
QuorumVoteHandler voteHandler = getVoteHandler(quorumVoteMessage.getHandler());
|
||||||
|
if (voteHandler == null) {
|
||||||
|
ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
quorumVoteMessage.decode(voteHandler);
|
||||||
public boolean checkLive(TransportConfiguration liveTransportConfiguration) {
|
ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());
|
||||||
try {
|
Vote vote = vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());
|
||||||
ClusterControl control = clusterController.connectToNode(liveTransportConfiguration);
|
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
|
||||||
control.close();
|
clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
|
||||||
return true;
|
|
||||||
} catch (Throwable t) {
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private final class VoteRunnableHolder {
|
private final class VoteRunnableHolder {
|
||||||
|
|
||||||
|
@ -289,6 +338,23 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Vote sendQuorumVote(ClusterControl clusterControl, SimpleString handler, Vote vote) {
|
||||||
|
try {
|
||||||
|
final ClientSessionFactoryInternal sessionFactory = clusterControl.getSessionFactory();
|
||||||
|
final String remoteAddress = sessionFactory.getConnection().getRemoteAddress();
|
||||||
|
ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(remoteAddress, vote.toString());
|
||||||
|
QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage) clusterControl.getClusterChannel().get()
|
||||||
|
.sendBlocking(new QuorumVoteMessage(handler, vote), PacketImpl.QUORUM_VOTE_REPLY);
|
||||||
|
QuorumVoteHandler voteHandler = getVoteHandler(replyMessage.getHandler());
|
||||||
|
replyMessage.decodeRest(voteHandler);
|
||||||
|
Vote voteResponse = replyMessage.getVote();
|
||||||
|
ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(remoteAddress, voteResponse.toString());
|
||||||
|
return voteResponse;
|
||||||
|
} catch (ActiveMQException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* this will connect to a node and then cast a vote. whether or not this vote is asked of the target node is dependent
|
* this will connect to a node and then cast a vote. whether or not this vote is asked of the target node is dependent
|
||||||
* on {@link org.apache.activemq.artemis.core.server.cluster.qourum.Vote#isRequestServerVote()}
|
* on {@link org.apache.activemq.artemis.core.server.cluster.qourum.Vote#isRequestServerVote()}
|
||||||
|
@ -318,7 +384,7 @@ public final class QuorumManager implements ClusterTopologyListener, ActiveMQCom
|
||||||
|
|
||||||
vote = quorumVote.connected();
|
vote = quorumVote.connected();
|
||||||
if (vote.isRequestServerVote()) {
|
if (vote.isRequestServerVote()) {
|
||||||
vote = clusterControl.sendQuorumVote(quorumVote.getName(), vote);
|
vote = sendQuorumVote(clusterControl, quorumVote.getName(), vote);
|
||||||
quorumVote.vote(vote);
|
quorumVote.vote(vote);
|
||||||
} else {
|
} else {
|
||||||
quorumVote.vote(vote);
|
quorumVote.vote(vote);
|
||||||
|
|
|
@ -332,21 +332,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener
|
||||||
//nothing to do here
|
//nothing to do here
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//the live is dead so lets vote for quorum
|
if (!quorumManager.hasLive(targetServerID, size, quorumVoteWait, TimeUnit.SECONDS)) {
|
||||||
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);
|
|
||||||
|
|
||||||
quorumManager.vote(quorumVote);
|
|
||||||
|
|
||||||
try {
|
|
||||||
quorumVote.await(quorumVoteWait, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException interruption) {
|
|
||||||
// No-op. The best the quorum can do now is to return the latest number it has
|
|
||||||
ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted();
|
|
||||||
}
|
|
||||||
|
|
||||||
quorumManager.voteComplete(quorumVote);
|
|
||||||
|
|
||||||
if (quorumVote.getDecision()) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.client.TopologyMember;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
||||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
|
import org.apache.activemq.artemis.core.config.ConfigurationUtils;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||||
|
@ -57,7 +58,6 @@ import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
|
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteServerConnect;
|
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
@ -223,6 +223,13 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static TransportConfiguration getLiveConnector(Configuration configuration) {
|
||||||
|
String connectorName = configuration.getClusterConfigurations().get(0).getConnectorName();
|
||||||
|
TransportConfiguration transportConfiguration = configuration.getConnectorConfigurations().get(connectorName);
|
||||||
|
assert transportConfiguration != null;
|
||||||
|
return transportConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
private final class ReplicationFailureListener implements FailureListener, CloseListener {
|
private final class ReplicationFailureListener implements FailureListener, CloseListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -253,28 +260,11 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
||||||
|
|
||||||
if (failed && replicatedPolicy.isVoteOnReplicationFailure()) {
|
if (failed && replicatedPolicy.isVoteOnReplicationFailure()) {
|
||||||
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
|
QuorumManager quorumManager = activeMQServer.getClusterManager().getQuorumManager();
|
||||||
int size = replicatedPolicy.getQuorumSize() == -1 ? quorumManager.getMaxClusterSize() : replicatedPolicy.getQuorumSize();
|
final boolean isStillLive = quorumManager.isStillLive(activeMQServer.getNodeID().toString(),
|
||||||
String liveConnector = null;
|
getLiveConnector(activeMQServer.getConfiguration()),
|
||||||
List<ClusterConnectionConfiguration> clusterConfigurations = activeMQServer.getConfiguration().getClusterConfigurations();
|
replicatedPolicy.getQuorumSize(),
|
||||||
if (clusterConfigurations != null && clusterConfigurations.size() > 0) {
|
5, TimeUnit.SECONDS);
|
||||||
ClusterConnectionConfiguration clusterConnectionConfiguration = clusterConfigurations.get(0);
|
if (!isStillLive) {
|
||||||
String connectorName = clusterConnectionConfiguration.getConnectorName();
|
|
||||||
TransportConfiguration transportConfiguration = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
|
|
||||||
liveConnector = transportConfiguration != null ? transportConfiguration.toString() : null;
|
|
||||||
}
|
|
||||||
QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, activeMQServer.getNodeID().toString(), true, liveConnector);
|
|
||||||
|
|
||||||
quorumManager.vote(quorumVote);
|
|
||||||
|
|
||||||
try {
|
|
||||||
quorumVote.await(5, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException interruption) {
|
|
||||||
// No-op. The best the quorum can do now is to return the latest number it has
|
|
||||||
}
|
|
||||||
|
|
||||||
quorumManager.voteComplete(quorumVote);
|
|
||||||
|
|
||||||
if (!quorumVote.getDecision()) {
|
|
||||||
try {
|
try {
|
||||||
Thread startThread = new Thread(new Runnable() {
|
Thread startThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -782,7 +782,7 @@ public class AcknowledgementTest extends JMSTestCase {
|
||||||
messageReceived = (TextMessage) consumer.receiveNoWait();
|
messageReceived = (TextMessage) consumer.receiveNoWait();
|
||||||
|
|
||||||
if (messageReceived != null) {
|
if (messageReceived != null) {
|
||||||
System.out.println("Message received " + messageReceived.getText());
|
log.debug("Message received " + messageReceived.getText());
|
||||||
}
|
}
|
||||||
Assert.assertNull(messageReceived);
|
Assert.assertNull(messageReceived);
|
||||||
|
|
||||||
|
@ -1316,8 +1316,8 @@ public class AcknowledgementTest extends JMSTestCase {
|
||||||
long time1 = consume(cf1, queue1, messageCount);
|
long time1 = consume(cf1, queue1, messageCount);
|
||||||
long time2 = consume(cf2, queue2, messageCount);
|
long time2 = consume(cf2, queue2, messageCount);
|
||||||
|
|
||||||
log.info("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume=" + time1);
|
log.debug("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume=" + time1);
|
||||||
log.info("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume=" + time2);
|
log.debug("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume=" + time2);
|
||||||
|
|
||||||
Assert.assertTrue(time1 < (time2 / 2));
|
Assert.assertTrue(time1 < (time2 / 2));
|
||||||
|
|
||||||
|
|
|
@ -101,12 +101,12 @@ public abstract class ActiveMQServerTestCase {
|
||||||
public TestRule watcher = new TestWatcher() {
|
public TestRule watcher = new TestWatcher() {
|
||||||
@Override
|
@Override
|
||||||
protected void starting(Description description) {
|
protected void starting(Description description) {
|
||||||
log.info(String.format("#*#*# Starting test: %s()...", description.getMethodName()));
|
log.debug(String.format("#*#*# Starting test: %s()...", description.getMethodName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void finished(Description description) {
|
protected void finished(Description description) {
|
||||||
log.info(String.format("#*#*# Finished test: %s()...", description.getMethodName()));
|
log.debug(String.format("#*#*# Finished test: %s()...", description.getMethodName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -57,7 +57,7 @@ public class AutoAckMessageListenerTest extends JMSTestCase {
|
||||||
consumer.setMessageListener(listener);
|
consumer.setMessageListener(listener);
|
||||||
|
|
||||||
// create and send messages
|
// create and send messages
|
||||||
log.info("Send and receive two message");
|
log.debug("Send and receive two message");
|
||||||
Message messageSent = session.createMessage();
|
Message messageSent = session.createMessage();
|
||||||
messageSent.setBooleanProperty("last", false);
|
messageSent.setBooleanProperty("last", false);
|
||||||
producer.send(messageSent);
|
producer.send(messageSent);
|
||||||
|
@ -67,7 +67,7 @@ public class AutoAckMessageListenerTest extends JMSTestCase {
|
||||||
conn.start();
|
conn.start();
|
||||||
|
|
||||||
// wait until message is received
|
// wait until message is received
|
||||||
log.info("waiting until message has been received by message listener...");
|
log.debug("waiting until message has been received by message listener...");
|
||||||
latch.await(10, TimeUnit.SECONDS);
|
latch.await(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// check message listener status
|
// check message listener status
|
||||||
|
@ -112,20 +112,20 @@ public class AutoAckMessageListenerTest extends JMSTestCase {
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
try {
|
try {
|
||||||
if (message.getBooleanProperty("last") == false) {
|
if (message.getBooleanProperty("last") == false) {
|
||||||
log.info("Received first message.");
|
log.debug("Received first message.");
|
||||||
if (message.getJMSRedelivered() == true) {
|
if (message.getJMSRedelivered() == true) {
|
||||||
// should not re-receive this one
|
// should not re-receive this one
|
||||||
log.info("Error: received first message twice");
|
log.debug("Error: received first message twice");
|
||||||
passed = false;
|
passed = false;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (message.getJMSRedelivered() == false) {
|
if (message.getJMSRedelivered() == false) {
|
||||||
// received second message for first time
|
// received second message for first time
|
||||||
log.info("Received second message. Calling recover()");
|
log.debug("Received second message. Calling recover()");
|
||||||
session.recover();
|
session.recover();
|
||||||
} else {
|
} else {
|
||||||
// should be redelivered after recover
|
// should be redelivered after recover
|
||||||
log.info("Received second message again as expected");
|
log.debug("Received second message again as expected");
|
||||||
passed = true;
|
passed = true;
|
||||||
monitor.countDown();
|
monitor.countDown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -171,10 +171,10 @@ public class TransactedSessionTest extends JMSTestCase {
|
||||||
MessageConsumer consumer = sess.createConsumer(ActiveMQServerTestCase.topic1);
|
MessageConsumer consumer = sess.createConsumer(ActiveMQServerTestCase.topic1);
|
||||||
conn.start();
|
conn.start();
|
||||||
|
|
||||||
log.info("sending message first time");
|
log.debug("sending message first time");
|
||||||
TextMessage mSent = sess.createTextMessage("igloo");
|
TextMessage mSent = sess.createTextMessage("igloo");
|
||||||
producer.send(mSent);
|
producer.send(mSent);
|
||||||
log.info("sent message first time");
|
log.debug("sent message first time");
|
||||||
|
|
||||||
sess.commit();
|
sess.commit();
|
||||||
|
|
||||||
|
@ -183,10 +183,10 @@ public class TransactedSessionTest extends JMSTestCase {
|
||||||
|
|
||||||
sess.commit();
|
sess.commit();
|
||||||
|
|
||||||
log.info("sending message again");
|
log.debug("sending message again");
|
||||||
mSent.setText("rollback");
|
mSent.setText("rollback");
|
||||||
producer.send(mSent);
|
producer.send(mSent);
|
||||||
log.info("sent message again");
|
log.debug("sent message again");
|
||||||
|
|
||||||
sess.commit();
|
sess.commit();
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class SelectorTest extends ActiveMQServerTestCase {
|
||||||
prod.send(redMessage);
|
prod.send(redMessage);
|
||||||
prod.send(blueMessage);
|
prod.send(blueMessage);
|
||||||
|
|
||||||
log.info("sent message");
|
log.debug("sent message");
|
||||||
|
|
||||||
Message rec = redConsumer.receive();
|
Message rec = redConsumer.receive();
|
||||||
ProxyAssertSupport.assertEquals(redMessage.getJMSMessageID(), rec.getJMSMessageID());
|
ProxyAssertSupport.assertEquals(redMessage.getJMSMessageID(), rec.getJMSMessageID());
|
||||||
|
@ -84,7 +84,7 @@ public class SelectorTest extends ActiveMQServerTestCase {
|
||||||
|
|
||||||
redConsumer.close();
|
redConsumer.close();
|
||||||
|
|
||||||
log.info("closed first consumer");
|
log.debug("closed first consumer");
|
||||||
|
|
||||||
MessageConsumer universalConsumer = session.createConsumer(queue1);
|
MessageConsumer universalConsumer = session.createConsumer(queue1);
|
||||||
|
|
||||||
|
@ -191,7 +191,7 @@ public class SelectorTest extends ActiveMQServerTestCase {
|
||||||
|
|
||||||
ProxyAssertSupport.assertEquals("john", m.getStringProperty("beatle"));
|
ProxyAssertSupport.assertEquals("john", m.getStringProperty("beatle"));
|
||||||
|
|
||||||
log.info("Got message " + j);
|
log.debug("Got message " + j);
|
||||||
}
|
}
|
||||||
|
|
||||||
Message m = cons1.receiveNoWait();
|
Message m = cons1.receiveNoWait();
|
||||||
|
@ -902,38 +902,38 @@ public class SelectorTest extends ActiveMQServerTestCase {
|
||||||
tm.setText("1");
|
tm.setText("1");
|
||||||
tm.setStringProperty("PROP1", "VALUE1");
|
tm.setStringProperty("PROP1", "VALUE1");
|
||||||
msgProducer.send(tm);
|
msgProducer.send(tm);
|
||||||
System.out.println("Sent message with id [" + tm.getJMSMessageID() + "]");
|
log.debug("Sent message with id [" + tm.getJMSMessageID() + "]");
|
||||||
|
|
||||||
tm = session.createTextMessage();
|
tm = session.createTextMessage();
|
||||||
tm.setText("2");
|
tm.setText("2");
|
||||||
tm.setStringProperty("PROP1", "VALUE1");
|
tm.setStringProperty("PROP1", "VALUE1");
|
||||||
msgProducer.send(tm);
|
msgProducer.send(tm);
|
||||||
System.out.println("Sent message with id [" + tm.getJMSMessageID() + "]");
|
log.debug("Sent message with id [" + tm.getJMSMessageID() + "]");
|
||||||
|
|
||||||
tm = session.createTextMessage();
|
tm = session.createTextMessage();
|
||||||
tm.setText("3");
|
tm.setText("3");
|
||||||
tm.setStringProperty("PROP2", "VALUE2");
|
tm.setStringProperty("PROP2", "VALUE2");
|
||||||
msgProducer.send(tm);
|
msgProducer.send(tm);
|
||||||
System.out.println("Sent message with id [" + tm.getJMSMessageID() + "]");
|
log.debug("Sent message with id [" + tm.getJMSMessageID() + "]");
|
||||||
|
|
||||||
tm = session.createTextMessage();
|
tm = session.createTextMessage();
|
||||||
tm.setText("4");
|
tm.setText("4");
|
||||||
tm.setStringProperty("PROP2", "VALUE2");
|
tm.setStringProperty("PROP2", "VALUE2");
|
||||||
msgProducer.send(tm);
|
msgProducer.send(tm);
|
||||||
System.out.println("Sent message with id [" + tm.getJMSMessageID() + "]");
|
log.debug("Sent message with id [" + tm.getJMSMessageID() + "]");
|
||||||
|
|
||||||
tm = session.createTextMessage();
|
tm = session.createTextMessage();
|
||||||
tm.setText("5");
|
tm.setText("5");
|
||||||
tm.setStringProperty("PROP1", "VALUE1");
|
tm.setStringProperty("PROP1", "VALUE1");
|
||||||
msgProducer.send(tm);
|
msgProducer.send(tm);
|
||||||
System.out.println("Sent message with id [" + tm.getJMSMessageID() + "]");
|
log.debug("Sent message with id [" + tm.getJMSMessageID() + "]");
|
||||||
|
|
||||||
tm = session.createTextMessage();
|
tm = session.createTextMessage();
|
||||||
tm.setText("6");
|
tm.setText("6");
|
||||||
tm.setStringProperty("PROP1", "VALUE1");
|
tm.setStringProperty("PROP1", "VALUE1");
|
||||||
tm.setStringProperty("PROP2", "VALUE2");
|
tm.setStringProperty("PROP2", "VALUE2");
|
||||||
msgProducer.send(tm);
|
msgProducer.send(tm);
|
||||||
System.out.println("Sent message with id [" + tm.getJMSMessageID() + "]");
|
log.debug("Sent message with id [" + tm.getJMSMessageID() + "]");
|
||||||
msgProducer.close();
|
msgProducer.close();
|
||||||
msgProducer = null;
|
msgProducer = null;
|
||||||
|
|
||||||
|
|
|
@ -93,14 +93,14 @@ public class ServerManagement {
|
||||||
" has not been created or has already been killed, so it cannot be killed");
|
" has not been created or has already been killed, so it cannot be killed");
|
||||||
} else {
|
} else {
|
||||||
Server server = ServerManagement.servers.get(i);
|
Server server = ServerManagement.servers.get(i);
|
||||||
ServerManagement.log.info("invoking kill() on server " + i);
|
ServerManagement.log.debug("invoking kill() on server " + i);
|
||||||
try {
|
try {
|
||||||
server.kill();
|
server.kill();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
// This is likely to throw an exception since the server dies before the response is received
|
// This is likely to throw an exception since the server dies before the response is received
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerManagement.log.info("Waiting for server to die");
|
ServerManagement.log.debug("Waiting for server to die");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -114,7 +114,7 @@ public class ServerManagement {
|
||||||
|
|
||||||
Thread.sleep(300);
|
Thread.sleep(300);
|
||||||
|
|
||||||
ServerManagement.log.info("server " + i + " killed and dead");
|
ServerManagement.log.debug("server " + i + " killed and dead");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue