This commit is contained in:
Clebert Suconic 2017-07-01 10:39:26 -04:00
commit c54120a47f
12 changed files with 127 additions and 34 deletions

View File

@ -369,7 +369,7 @@ public interface ActiveMQClientLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT)
void errorDecodingPacket(@Cause Exception e);
void errorDecodingPacket(@Cause Throwable e);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT)

View File

@ -457,6 +457,10 @@ public final class ChannelImpl implements Channel {
@Override
public void setHandler(final ChannelHandler handler) {
if (logger.isTraceEnabled()) {
logger.trace("Setting handler on " + this + " as " + handler);
}
this.handler = handler;
}
@ -516,6 +520,9 @@ public final class ChannelImpl implements Channel {
@Override
public void lock() {
if (logger.isTraceEnabled()) {
logger.trace("lock channel " + this);
}
lock.lock();
reconnectID.incrementAndGet();
@ -527,6 +534,9 @@ public final class ChannelImpl implements Channel {
@Override
public void unlock() {
if (logger.isTraceEnabled()) {
logger.trace("unlock channel " + this);
}
lock.lock();
failingOver = false;

View File

@ -362,7 +362,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
doBufferReceived(packet);
super.bufferReceived(connectionID, buffer);
} catch (Exception e) {
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
}
}

View File

@ -38,8 +38,9 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
return synchronizationIsFinishedAcknowledgement;
}
public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
return this;
}
@Override

View File

@ -209,7 +209,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
}
channel.send(response);
if (response != null) {
if (logger.isTraceEnabled()) {
logger.trace("Returning " + response);
}
channel.send(response);
} else {
logger.trace("Response is null, ignoring response");
}
}
/**
@ -331,34 +340,68 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
private void finishSynchronization(String liveID) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("finishSynchronization::" + liveID);
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc);
if (logger.isTraceEnabled()) {
logger.trace("getting lock on " + jc + ", journal = " + journal);
}
registerJournal(jc.typeByte, journal);
journal.synchronizationLock();
try {
if (logger.isTraceEnabled()) {
logger.trace("lock acquired on " + jc);
}
// files should be already in place.
filesReservedForSync.remove(jc);
registerJournal(jc.typeByte, journal);
if (logger.isTraceEnabled()) {
logger.trace("stopping journal for " + jc);
}
journal.stop();
if (logger.isTraceEnabled()) {
logger.trace("starting journal for " + jc);
}
journal.start();
if (logger.isTraceEnabled()) {
logger.trace("loadAndSync " + jc);
}
journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE);
} finally {
if (logger.isTraceEnabled()) {
logger.trace("unlocking " + jc);
}
journal.synchronizationUnlock();
}
}
if (logger.isTraceEnabled()) {
logger.trace("Sync on large messages...");
}
ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet()) {
ReplicatedLargeMessage lm = entry.getValue();
if (lm instanceof LargeServerMessageInSync) {
LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm;
if (logger.isTraceEnabled()) {
logger.trace("lmSync on " + lmSync.toString());
}
lmSync.joinSyncedData(buffer);
}
}
if (logger.isTraceEnabled()) {
logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
}
journalsHolder = null;
backupQuorum.liveIDSet(liveID);
activation.setRemoteBackupUpToDate();
if (logger.isTraceEnabled()) {
logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
}
ActiveMQServerLogger.LOGGER.backupServerSynched(server);
return;
}
@ -427,13 +470,28 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
if (logger.isTraceEnabled()) {
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
}
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started)
return replicationResponseMessage;
if (packet.isSynchronizationFinished()) {
finishSynchronization(packet.getNodeID());
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
executor.execute(() -> {
try {
// this is a long running process, we cannot block the reading thread from netty
finishSynchronization(packet.getNodeID());
if (logger.isTraceEnabled()) {
logger.trace("returning completion on synchronization catchup");
}
channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
} catch (Exception e) {
logger.warn(e.getMessage());
channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
}
});
// the write will happen through an executor
return null;
}
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
if (!started) {
return replicationResponseMessage;
}

View File

@ -350,15 +350,16 @@ public final class ReplicationManager implements ActiveMQComponent {
}
if (enabled) {
pendingTokens.add(repliToken);
if (useExecutor) {
replicationStream.execute(() -> {
if (enabled) {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
});
} else {
pendingTokens.add(repliToken);
flowControl(packet.expectedEncodeSize());
replicatingChannel.send(packet);
}
@ -405,9 +406,9 @@ public final class ReplicationManager implements ActiveMQComponent {
OperationContext ctx = pendingTokens.poll();
if (ctx == null) {
throw new IllegalStateException("Missing replication token on the queue.");
logger.warn("Missing replication token on queue");
return;
}
ctx.replicationDone();
}

View File

@ -22,10 +22,10 @@ import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.jboss.logging.Logger;
/**
* Stops the backup in case of an error at the start of Replication.
@ -36,11 +36,11 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
*/
final class ReplicationError implements Interceptor {
private final ActiveMQServer server;
private static final Logger logger = Logger.getLogger(ReplicationError.class);
private LiveNodeLocator nodeLocator;
ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) {
this.server = server;
ReplicationError(LiveNodeLocator nodeLocator) {
this.nodeLocator = nodeLocator;
}
@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor {
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
return true;
if (logger.isTraceEnabled()) {
logger.trace("Received ReplicationError::" + packet);
}
BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet;
switch (message.getRegistrationProblem()) {
case ALREADY_REPLICATING:

View File

@ -101,6 +101,8 @@ public final class SharedNothingBackupActivation extends Activation {
@Override
public void run() {
try {
logger.trace("SharedNothingBackupActivation..start");
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
@ -109,16 +111,24 @@ public final class SharedNothingBackupActivation extends Activation {
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
activeMQServer.getNodeManager().start();
synchronized (this) {
if (closed)
if (closed) {
logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
return;
}
}
boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();
if (!activeMQServer.initialisePart1(scalingDown))
if (!activeMQServer.initialisePart1(scalingDown)) {
if (logger.isTraceEnabled()) {
logger.trace("could not initialize part1 " + scalingDown);
}
return;
}
logger.trace("Waiting for a synchronize now...");
synchronized (this) {
logger.trace("Entered a synchronized");
if (closed)
return;
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck);
@ -136,16 +146,12 @@ public final class SharedNothingBackupActivation extends Activation {
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
if (logger.isTraceEnabled()) {
logger.trace("Waiting on cluster connection");
}
//todo do we actually need to wait?
logger.trace("Waiting on cluster connection");
clusterController.awaitConnectionToReplicationCluster();
if (logger.isTraceEnabled()) {
logger.trace("Cluster Connected");
}
clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
logger.trace("Cluster Connected");
clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));
// nodeManager.startBackup();
if (logger.isTraceEnabled()) {
@ -319,13 +325,19 @@ public final class SharedNothingBackupActivation extends Activation {
return;
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
logger.trace("stop backup");
activeMQServer.getNodeManager().stopBackup();
logger.trace("start store manager");
activeMQServer.getStorageManager().start();
logger.trace("activated");
activeMQServer.getBackupManager().activated();
if (scalingDown) {
logger.trace("Scalling down...");
activeMQServer.initialisePart2(true);
} else {
logger.trace("Setting up new activation");
activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
logger.trace("initialize part 2");
activeMQServer.initialisePart2(false);
if (activeMQServer.getIdentity() != null) {
@ -336,6 +348,8 @@ public final class SharedNothingBackupActivation extends Activation {
}
logger.trace("completeActivation at the end");
activeMQServer.completeActivation();
}
} catch (Exception e) {

View File

@ -531,7 +531,11 @@ public abstract class ActiveMQTestBase extends Assert {
for (String c : connectors) {
connectors0.add(c);
}
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0);
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
setName("cluster1").setAddress("jms").setConnectorName(connectorName).
setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
setStaticConnectors(connectors0);
return clusterConnectionConfiguration;
}

View File

@ -18,22 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.junit.Ignore;
import org.junit.Test;
public class LargeMessageFailoverTest extends FailoverTest {
@Override
@Test
@Ignore
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception {
// skip test because it triggers OutOfMemoryError.
Thread.sleep(1000);
}
@Override
@Test
@Ignore
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
// skip test because it triggers OutOfMemoryError.
Thread.sleep(1000);
}
/**

View File

@ -135,7 +135,7 @@ public class ReplicatedMultipleServerFailoverTest extends MultipleServerFailover
@Override
public boolean isNetty() {
return false;
return true;
}
@Override

View File

@ -86,7 +86,7 @@ public final class TransportConfigurationUtils {
private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) {
if (classname.contains("netty")) {
Map<String, Object> serverParams = new HashMap<>();
Integer port = live ? 61616 : 5545;
Integer port = live ? 61616 + server : 5545 + server;
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
return new TransportConfiguration(classname, serverParams);
}
@ -102,7 +102,7 @@ public final class TransportConfigurationUtils {
String name) {
if (classname.contains("netty")) {
Map<String, Object> serverParams = new HashMap<>();
Integer port = live ? 61616 : 5545;
Integer port = live ? 61616 + server : 5545 + server;
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
return new TransportConfiguration(classname, serverParams, name);
}