ARTEMIS-474 Clustering fails on certain topologies

Communication between nodes will fail under certain topologies
JGroups has something called JForkChannel that could be used on container systems.
And be injected into Artemis.
For some reason that channel cannot be reused for more than one channel per VM.
And it cannot ever be closed.

I am keeping the trace logs I used to debug this issue in case anything similar to this happens again.
This commit is contained in:
Clebert Suconic 2016-04-14 17:51:18 -04:00
parent d6c7e30594
commit 630db2d69c
13 changed files with 474 additions and 173 deletions

View File

@ -16,6 +16,11 @@
*/
package org.apache.activemq.artemis.api.core;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
/**
@ -25,11 +30,49 @@ import org.jgroups.JChannel;
*/
public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory {
private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final JChannel channel;
private final String channelName;
private final JChannelManager manager;
private static final Map<JChannel, JChannelManager> managers = new ConcurrentHashMap<>();
private static final JChannelManager singletonManager = new JChannelManager();
// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly
//
// private static JChannelManager recoverManager(JChannel channel) {
// JChannelManager manager = managers.get(channel);
// if (manager == null) {
// if (isTrace) {
// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
// }
// manager = new JChannelManager();
// managers.put(channel, manager);
// }
// else {
// if (isTrace) {
// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace"));
// }
//
// }
//
// return manager;
// }
//
public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) {
// TODO: use recoverManager(channel)
this(singletonManager, channel, channelName);
}
private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String channelName) {
if (isTrace) {
logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel + ", " + channelName, new Exception("trace"));
}
this.manager = manager;
this.channel = channel;
this.channelName = channelName;
}
@ -42,8 +85,17 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
return channelName;
}
@Override
public String toString() {
return "ChannelBroadcastEndpointFactory{" +
"channel=" + channel +
", channelName='" + channelName + '\'' +
", manager=" + manager +
'}';
}
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel();
return new JGroupsChannelBroadcastEndpoint(manager, channel, channelName).initChannel();
}
}

View File

@ -16,15 +16,11 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
import org.apache.activemq.artemis.api.core.jgroups.JGroupsReceiver;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
@ -32,6 +28,9 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final String channelName;
private boolean clientOpened;
@ -42,12 +41,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
private JGroupsReceiver receiver;
public JGroupsBroadcastEndpoint(String channelName) {
private JChannelManager manager;
public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) {
this.manager = manager;
this.channelName = channelName;
}
@Override
public void broadcast(final byte[] data) throws Exception {
if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
if (broadcastOpened) {
org.jgroups.Message msg = new org.jgroups.Message();
@ -59,6 +62,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
@Override
public byte[] receiveBroadcast() throws Exception {
if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
if (clientOpened) {
return receiver.receiveBroadcast();
}
@ -69,6 +73,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
@Override
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
if (clientOpened) {
return receiver.receiveBroadcast(time, unit);
}
@ -99,7 +104,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
public abstract JChannel createChannel() throws Exception;
public JGroupsBroadcastEndpoint initChannel() throws Exception {
this.channel = JChannelManager.getJChannel(channelName, this);
this.channel = manager.getJChannel(channelName, this);
return this;
}
@ -128,146 +133,4 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
channel.close(true);
}
/**
* This class is used to receive messages from a JGroups channel.
* Incoming messages are put into a queue.
*/
private static final class JGroupsReceiver extends ReceiverAdapter {
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
@Override
public void receive(org.jgroups.Message msg) {
dequeue.add(msg.getBuffer());
}
public byte[] receiveBroadcast() throws Exception {
return dequeue.take();
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
return dequeue.poll(time, unit);
}
}
/**
* This class wraps a JChannel with a reference counter. The reference counter
* controls the life of the JChannel. When reference count is zero, the channel
* will be disconnected.
*/
protected static class JChannelWrapper {
int refCount = 1;
JChannel channel;
String channelName;
final List<JGroupsReceiver> receivers = new ArrayList<>();
public JChannelWrapper(String channelName, JChannel channel) throws Exception {
this.refCount = 1;
this.channelName = channelName;
this.channel = channel;
//we always add this for the first ref count
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(org.jgroups.Message msg) {
synchronized (receivers) {
for (JGroupsReceiver r : receivers) {
r.receive(msg);
}
}
}
});
}
public synchronized void close(boolean closeWrappedChannel) {
refCount--;
if (refCount == 0) {
if (closeWrappedChannel) {
JChannelManager.closeChannel(this.channelName, channel);
}
else {
JChannelManager.removeChannel(this.channelName);
}
//we always remove the receiver as its no longer needed
channel.setReceiver(null);
}
}
public void removeReceiver(JGroupsReceiver receiver) {
synchronized (receivers) {
receivers.remove(receiver);
}
}
public synchronized void connect() throws Exception {
if (channel.isConnected())
return;
channel.connect(channelName);
}
public void addReceiver(JGroupsReceiver jGroupsReceiver) {
synchronized (receivers) {
receivers.add(jGroupsReceiver);
}
}
public void send(org.jgroups.Message msg) throws Exception {
channel.send(msg);
}
public JChannelWrapper addRef() {
this.refCount++;
return this;
}
@Override
public String toString() {
return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
}
}
/**
* This class maintain a global Map of JChannels wrapped in JChannelWrapper for
* the purpose of reference counting.
*
* Wherever a JChannel is needed it should only get it by calling the getChannel()
* method of this class. The real disconnect of channels are also done here only.
*/
protected static class JChannelManager {
private static Map<String, JChannelWrapper> channels;
public static synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception {
if (channels == null) {
channels = new HashMap<>();
}
JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null) {
wrapper = new JChannelWrapper(channelName, endpoint.createChannel());
channels.put(channelName, wrapper);
return wrapper;
}
return wrapper.addRef();
}
public static synchronized void closeChannel(String channelName, JChannel channel) {
channel.setReceiver(null);
channel.disconnect();
channel.close();
JChannelWrapper wrapper = channels.remove(channelName);
if (wrapper == null) {
throw new IllegalStateException("Did not find channel " + channelName);
}
}
public static void removeChannel(String channelName) {
JChannelWrapper wrapper = channels.remove(channelName);
if (wrapper == null) {
throw new IllegalStateException("Did not find channel " + channelName);
}
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.jgroups.JChannelWrapper;
import org.jgroups.JChannel;
/**
@ -27,8 +29,8 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint {
private final JChannel jChannel;
public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception {
super(channelName);
public JGroupsChannelBroadcastEndpoint(JChannelManager manager, JChannel jChannel, final String channelName) {
super(manager, channelName);
this.jChannel = jChannel;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jgroups.JChannel;
import java.net.URL;
@ -27,8 +28,8 @@ public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
private String file;
public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception {
super(channelName);
public JGroupsFileBroadcastEndpoint(final JChannelManager manager, final String file, final String channelName) throws Exception {
super(manager, channelName);
this.file = file;
}

View File

@ -16,15 +16,20 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory {
private String file;
private String channelName;
private final JChannelManager manager = new JChannelManager();
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
return new JGroupsFileBroadcastEndpoint(manager, file, channelName).initChannel();
}
public String getFile() {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator;
@ -26,8 +27,8 @@ public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEn
private String properties;
public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception {
super(channelName);
public JGroupsPropertiesBroadcastEndpoint(final JChannelManager manager, final String properties, final String channelName) throws Exception {
super(manager, channelName);
this.properties = properties;
}
@ -37,3 +38,4 @@ public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEn
return new JChannel(configurator);
}
}

View File

@ -16,15 +16,19 @@
*/
package org.apache.activemq.artemis.api.core;
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory {
private String properties;
private String channelName;
private final JChannelManager manager = new JChannelManager();
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel();
return new JGroupsPropertiesBroadcastEndpoint(manager, properties, channelName).initChannel();
}
public String getProperties() {

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.jgroups;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.JGroupsBroadcastEndpoint;
import org.jboss.logging.Logger;
/**
* This class maintain a global Map of JChannels wrapped in JChannelWrapper for
* the purpose of reference counting.
*
* Wherever a JChannel is needed it should only get it by calling the getChannel()
* method of this class. The real disconnect of channels are also done here only.
*/
public class JChannelManager {
private static final Logger logger = Logger.getLogger(JChannelManager.class);
private static final boolean isTrace = logger.isTraceEnabled();
private Map<String, JChannelWrapper> channels;
public synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception {
if (channels == null) {
channels = new HashMap<>();
}
JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null) {
wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());
channels.put(channelName, wrapper);
if (isTrace)
logger.trace("Put Channel " + channelName);
return wrapper;
}
if (isTrace)
logger.trace("Add Ref Count " + channelName);
return wrapper.addRef();
}
public synchronized void removeChannel(String channelName) {
channels.remove(channelName);
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.jgroups;
import java.util.ArrayList;
import java.util.List;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
/**
* This class wraps a JChannel with a reference counter. The reference counter
* controls the life of the JChannel. When reference count is zero, the channel
* will be disconnected.
*/
public class JChannelWrapper {
private static final Logger logger = Logger.getLogger(JChannelWrapper.class);
private static final boolean isTrace = logger.isTraceEnabled();
private boolean connected = false;
int refCount = 1;
final JChannel channel;
final String channelName;
final List<JGroupsReceiver> receivers = new ArrayList<>();
private final JChannelManager manager;
public JChannelWrapper(JChannelManager manager, final String channelName, JChannel channel) throws Exception {
this.refCount = 1;
this.channelName = channelName;
this.channel = channel;
this.manager = manager;
if (channel.getReceiver() != null) {
logger.warn("The channel already had a receiver previously!!!!", new Exception("trace"));
}
//we always add this for the first ref count
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(org.jgroups.Message msg) {
if (isTrace) {
logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName);
}
synchronized (receivers) {
for (JGroupsReceiver r : receivers) {
r.receive(msg);
}
}
}
});
}
public JChannel getChannel() {
return channel;
}
public String getChannelName() {
return channelName;
}
public synchronized void close(boolean closeWrappedChannel) {
refCount--;
if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
if (refCount == 0) {
if (closeWrappedChannel) {
connected = false;
channel.setReceiver(null);
logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
channel.close();
}
manager.removeChannel(channelName);
}
}
public void removeReceiver(JGroupsReceiver receiver) {
if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace"));
synchronized (receivers) {
receivers.remove(receiver);
}
}
public synchronized void connect() throws Exception {
if (isTrace) {
logger.trace(this + ":: Connecting " + channelName, new Exception("Trace"));
}
// It is important to check this otherwise we could reconnect an already connected channel
if (connected) {
return;
}
connected = true;
if (!channel.isConnected()) {
channel.connect(channelName);
}
}
public void addReceiver(JGroupsReceiver jGroupsReceiver) {
synchronized (receivers) {
if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName);
receivers.add(jGroupsReceiver);
}
}
public void send(org.jgroups.Message msg) throws Exception {
if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
channel.send(msg);
}
public JChannelWrapper addRef() {
this.refCount++;
if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
return this;
}
@Override
public String toString() {
return super.toString() +
"{refCount=" + refCount +
", channel=" + channel +
", channelName='" + channelName + '\'' +
", connected=" + connected +
'}';
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.jgroups;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.jboss.logging.Logger;
import org.jgroups.ReceiverAdapter;
/**
* This class is used to receive messages from a JGroups channel.
* Incoming messages are put into a queue.
*/
public class JGroupsReceiver extends ReceiverAdapter {
private static final Logger logger = Logger.getLogger(JGroupsReceiver.class);
private static final boolean isTrace = logger.isTraceEnabled();
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
@Override
public void receive(org.jgroups.Message msg) {
if (isTrace) logger.trace("sending message " + msg);
dequeue.add(msg.getBuffer());
}
public byte[] receiveBroadcast() throws Exception {
byte[] bytes = dequeue.take();
if (isTrace) {
logBytes("receiveBroadcast()", bytes);
}
return bytes;
}
private void logBytes(String methodName, byte[] bytes) {
if (bytes != null) {
logger.trace(methodName + "::" + bytes.length + " bytes");
}
else {
logger.trace(methodName + ":: no bytes");
}
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
byte[] bytes = dequeue.poll(time, unit);
if (isTrace) {
logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes);
}
return bytes;
}
}

View File

@ -809,7 +809,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
// how the sendSubscription happens.
// in case this ever changes.
if (topology != null && !factory.waitForTopology(callTimeout, TimeUnit.MILLISECONDS)) {
if (factory != null) {
factory.cleanup();
}
throw ActiveMQClientMessageBundle.BUNDLE.connectionTimedOutOnReceiveTopology(discoveryGroup);
}

View File

@ -408,8 +408,9 @@ public class ClusterController implements ActiveMQComponent {
}
}
catch (ActiveMQException e) {
if (!started)
if (!started) {
return;
}
server.getScheduledPool().schedule(this, serverLocator.getRetryInterval(), TimeUnit.MILLISECONDS);
}
}

View File

@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBacku
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -53,6 +54,10 @@ import static org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothi
public final class SharedNothingBackupActivation extends Activation {
private static final Logger logger = Logger.getLogger(SharedNothingBackupActivation.class);
private static final boolean isTrace = logger.isTraceEnabled();
//this is how we act when we start as a backup
private ReplicaPolicy replicaPolicy;
@ -129,43 +134,86 @@ public final class SharedNothingBackupActivation extends Activation {
}
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
if (isTrace) {
logger.trace("Waiting on cluster connection");
}
//todo do we actually need to wait?
clusterController.awaitConnectionToReplicationCluster();
if (isTrace) {
logger.trace("Cluster Connected");
}
clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
// nodeManager.startBackup();
if (isTrace) {
logger.trace("Starting backup manager");
}
activeMQServer.getBackupManager().start();
if (isTrace) {
logger.trace("Set backup Quorum");
}
replicationEndpoint.setBackupQuorum(backupQuorum);
replicationEndpoint.setExecutor(activeMQServer.getExecutorFactory().getExecutor());
EndpointConnector endpointConnector = new EndpointConnector();
if (isTrace) {
logger.trace("Starting Backup Server");
}
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(), activeMQServer.getNodeManager().getNodeId());
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
if (isTrace) logger.trace("Setting server state as started");
SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;
do {
//locate the first live server to try to replicate
nodeLocator.locateNode();
if (closed) {
if (isTrace) {
logger.trace("Activation is closed, so giving up");
}
return;
}
if (isTrace) {
logger.trace("looking up the node through nodeLocator.locateNode()");
}
//locate the first live server to try to replicate
nodeLocator.locateNode();
Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();
nodeID = nodeLocator.getNodeID();
if (isTrace) {
logger.trace("nodeID = " + nodeID);
}
//in a normal (non failback) scenario if we couldn't find our live server we should fail
if (!attemptFailBack) {
if (isTrace) {
logger.trace("attemptFailback=false, nodeID=" + nodeID);
}
//this shouldn't happen
if (nodeID == null)
if (nodeID == null) {
logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");
throw new RuntimeException("Could not establish the connection");
}
activeMQServer.getNodeManager().setNodeID(nodeID);
}
try {
if (isTrace) {
logger.trace("Calling clusterController.connectToNodeInReplicatedCluster(" + possibleLive.getA() + ")");
}
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA());
}
catch (Exception e) {
logger.debug(e.getMessage(), e);
if (possibleLive.getB() != null) {
try {
clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB());
@ -176,6 +224,10 @@ public final class SharedNothingBackupActivation extends Activation {
}
}
if (clusterControl == null) {
if (isTrace) {
logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry");
}
//its ok to retry here since we haven't started replication yet
//it may just be the server has gone since discovery
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
@ -190,23 +242,43 @@ public final class SharedNothingBackupActivation extends Activation {
* process again on the next live server. All the action happens inside {@link BackupQuorum}
*/
signal = backupQuorum.waitForStatusChange();
if (isTrace) {
logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()");
}
/**
* replicationEndpoint will be holding lots of open files. Make sure they get
* closed/sync'ed.
*/
ActiveMQServerImpl.stopComponent(replicationEndpoint);
// time to give up
if (!activeMQServer.isStarted() || signal == STOP)
if (!activeMQServer.isStarted() || signal == STOP) {
if (isTrace) {
logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal);
}
return;
}
// time to fail over
else if (signal == FAIL_OVER)
else if (signal == FAIL_OVER) {
if (isTrace) {
logger.trace("signal == FAIL_OVER, breaking the loop");
}
break;
}
// something has gone badly run restart from scratch
else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) {
if (isTrace) {
logger.trace("Starting a new thread to stop the server!");
}
Thread startThread = new Thread(new Runnable() {
@Override
public void run() {
try {
if (isTrace) {
logger.trace("Calling activeMQServer.stop()");
}
activeMQServer.stop();
}
catch (Exception e) {
@ -227,17 +299,30 @@ public final class SharedNothingBackupActivation extends Activation {
}
} while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);
if (isTrace) {
logger.trace("Activation loop finished, current signal = " + signal);
}
activeMQServer.getClusterManager().getQuorumManager().unRegisterQuorum(backupQuorum);
if (!isRemoteBackupUpToDate()) {
logger.debug("throwing exception for !isRemoteBackupUptoDate");
throw ActiveMQMessageBundle.BUNDLE.backupServerNotInSync();
}
if (isTrace) {
logger.trace("setReplicaPolicy::" + replicaPolicy);
}
replicaPolicy.getReplicatedPolicy().setReplicaPolicy(replicaPolicy);
activeMQServer.setHAPolicy(replicaPolicy.getReplicatedPolicy());
synchronized (activeMQServer) {
if (!activeMQServer.isStarted())
if (!activeMQServer.isStarted()) {
logger.trace("Server is stopped, giving up right before becomingLive");
return;
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
activeMQServer.getNodeManager().stopBackup();
activeMQServer.getStorageManager().start();
@ -262,6 +347,9 @@ public final class SharedNothingBackupActivation extends Activation {
}
}
catch (Exception e) {
if (isTrace) {
logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e);
}
if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())
// do not log these errors if the server is being stopped.
return;
@ -374,8 +462,10 @@ public final class SharedNothingBackupActivation extends Activation {
* @throws ActiveMQException
*/
public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws ActiveMQException {
ActiveMQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
if (isTrace) {
logger.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" +
backupUpToDate);
}
if (!activeMQServer.getHAPolicy().isBackup() || activeMQServer.getHAPolicy().isSharedStore()) {
throw new ActiveMQInternalErrorException();
}