From 387fca584e7cb66656f23b29a24169bb2fb8fc67 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 30 Jun 2017 20:19:43 -0400 Subject: [PATCH] ARTEMIS-1269 replication won't finish synchronization --- .../core/client/ActiveMQClientLogger.java | 2 +- .../core/protocol/core/impl/ChannelImpl.java | 10 +++ .../core/impl/RemotingConnectionImpl.java | 2 +- .../ReplicationResponseMessageV2.java | 3 +- .../core/replication/ReplicationEndpoint.java | 74 +++++++++++++++++-- .../core/replication/ReplicationManager.java | 7 +- .../core/server/impl/ReplicationError.java | 12 ++- .../impl/SharedNothingBackupActivation.java | 34 ++++++--- .../artemis/tests/util/ActiveMQTestBase.java | 6 +- .../failover/LargeMessageFailoverTest.java | 5 +- .../ReplicatedMultipleServerFailoverTest.java | 2 +- .../util/TransportConfigurationUtils.java | 4 +- 12 files changed, 127 insertions(+), 34 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index bdb4bd1631..405ed07a93 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT) - void errorDecodingPacket(@Cause Exception e); + void errorDecodingPacket(@Cause Throwable e); @LogMessage(level = Logger.Level.ERROR) @Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index 75c23de8bd..39bddf5d29 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -462,6 +462,10 @@ public final class ChannelImpl implements Channel { @Override public void setHandler(final ChannelHandler handler) { + if (logger.isTraceEnabled()) { + logger.trace("Setting handler on " + this + " as " + handler); + } + this.handler = handler; } @@ -521,6 +525,9 @@ public final class ChannelImpl implements Channel { @Override public void lock() { + if (logger.isTraceEnabled()) { + logger.trace("lock channel " + this); + } lock.lock(); reconnectID.incrementAndGet(); @@ -532,6 +539,9 @@ public final class ChannelImpl implements Channel { @Override public void unlock() { + if (logger.isTraceEnabled()) { + logger.trace("unlock channel " + this); + } lock.lock(); failingOver = false; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index cc1d6852b1..e0837e9ef8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -363,7 +363,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement doBufferReceived(packet); super.bufferReceived(connectionID, buffer); - } catch (Exception e) { + } catch (Throwable e) { ActiveMQClientLogger.LOGGER.errorDecodingPacket(e); throw new IllegalStateException(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java index b26084bb4c..c01dd4fff7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/ReplicationResponseMessageV2.java @@ -38,8 +38,9 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa return synchronizationIsFinishedAcknowledgement; } - public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { + public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) { this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement; + return this; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java index f879aeb434..a68c3f9002 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationEndpoint.java @@ -210,7 +210,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet); response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)); } - channel.send(response); + + if (response != null) { + if (logger.isTraceEnabled()) { + logger.trace("Returning " + response); + } + + channel.send(response); + } else { + logger.trace("Response is null, ignoring response"); + } } /** @@ -332,34 +341,68 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon private void finishSynchronization(String liveID) throws Exception { if (logger.isTraceEnabled()) { - logger.trace("finishSynchronization::" + liveID); + logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID); } for (JournalContent jc : EnumSet.allOf(JournalContent.class)) { Journal journal = journalsHolder.remove(jc); + if (logger.isTraceEnabled()) { + logger.trace("getting lock on " + jc + ", journal = " + journal); + } + registerJournal(jc.typeByte, journal); journal.synchronizationLock(); try { + if (logger.isTraceEnabled()) { + logger.trace("lock acquired on " + jc); + } // files should be already in place. filesReservedForSync.remove(jc); - registerJournal(jc.typeByte, journal); + if (logger.isTraceEnabled()) { + logger.trace("stopping journal for " + jc); + } journal.stop(); + if (logger.isTraceEnabled()) { + logger.trace("starting journal for " + jc); + } journal.start(); + if (logger.isTraceEnabled()) { + logger.trace("loadAndSync " + jc); + } journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE); } finally { + if (logger.isTraceEnabled()) { + logger.trace("unlocking " + jc); + } journal.synchronizationUnlock(); } } + + if (logger.isTraceEnabled()) { + logger.trace("Sync on large messages..."); + } ByteBuffer buffer = ByteBuffer.allocate(4 * 1024); for (Entry entry : largeMessages.entrySet()) { ReplicatedLargeMessage lm = entry.getValue(); if (lm instanceof LargeServerMessageInSync) { LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm; + if (logger.isTraceEnabled()) { + logger.trace("lmSync on " + lmSync.toString()); + } lmSync.joinSyncedData(buffer); } } + if (logger.isTraceEnabled()) { + logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID); + } + journalsHolder = null; backupQuorum.liveIDSet(liveID); activation.setRemoteBackupUpToDate(); + + if (logger.isTraceEnabled()) { + logger.trace("Backup is synchronized / BACKUP-SYNC-DONE"); + } + ActiveMQServerLogger.LOGGER.backupServerSynched(server); return; } @@ -428,13 +471,28 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon if (logger.isTraceEnabled()) { logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet); } - ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); - if (!started) - return replicationResponseMessage; if (packet.isSynchronizationFinished()) { - finishSynchronization(packet.getNodeID()); - replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true); + executor.execute(() -> { + try { + // this is a long running process, we cannot block the reading thread from netty + finishSynchronization(packet.getNodeID()); + if (logger.isTraceEnabled()) { + logger.trace("returning completion on synchronization catchup"); + } + channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true)); + } catch (Exception e) { + logger.warn(e.getMessage()); + channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e))); + } + + }); + // the write will happen through an executor + return null; + } + + ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2(); + if (!started) { return replicationResponseMessage; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java index 398f4527f5..3b6f9d6253 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/replication/ReplicationManager.java @@ -356,15 +356,16 @@ public final class ReplicationManager implements ActiveMQComponent { } if (enabled) { - pendingTokens.add(repliToken); if (useExecutor) { replicationStream.execute(() -> { if (enabled) { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } }); } else { + pendingTokens.add(repliToken); flowControl(packet.expectedEncodeSize()); replicatingChannel.send(packet); } @@ -411,9 +412,9 @@ public final class ReplicationManager implements ActiveMQComponent { OperationContext ctx = pendingTokens.poll(); if (ctx == null) { - throw new IllegalStateException("Missing replication token on the queue."); + logger.warn("Missing replication token on queue"); + return; } - ctx.replicationDone(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java index 7c333a5a63..83b49c90ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationError.java @@ -22,10 +22,10 @@ import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; -import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LiveNodeLocator; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.jboss.logging.Logger; /** * Stops the backup in case of an error at the start of Replication. @@ -36,11 +36,11 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; */ final class ReplicationError implements Interceptor { - private final ActiveMQServer server; + private static final Logger logger = Logger.getLogger(ReplicationError.class); + private LiveNodeLocator nodeLocator; - ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) { - this.server = server; + ReplicationError(LiveNodeLocator nodeLocator) { this.nodeLocator = nodeLocator; } @@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor { public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED) return true; + + if (logger.isTraceEnabled()) { + logger.trace("Received ReplicationError::" + packet); + } BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet; switch (message.getRegistrationProblem()) { case ALREADY_REPLICATING: diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index d45abe3a80..fcba00c8d9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -101,6 +101,8 @@ public final class SharedNothingBackupActivation extends Activation { @Override public void run() { try { + + logger.trace("SharedNothingBackupActivation..start"); synchronized (activeMQServer) { activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); } @@ -109,16 +111,24 @@ public final class SharedNothingBackupActivation extends Activation { activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize()); activeMQServer.getNodeManager().start(); synchronized (this) { - if (closed) + if (closed) { + logger.trace("SharedNothingBackupActivation is closed, ignoring activation!"); return; + } } boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled(); - if (!activeMQServer.initialisePart1(scalingDown)) + if (!activeMQServer.initialisePart1(scalingDown)) { + if (logger.isTraceEnabled()) { + logger.trace("could not initialize part1 " + scalingDown); + } return; + } + logger.trace("Waiting for a synchronize now..."); synchronized (this) { + logger.trace("Entered a synchronized"); if (closed) return; backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize()); @@ -136,16 +146,12 @@ public final class SharedNothingBackupActivation extends Activation { ClusterController clusterController = activeMQServer.getClusterManager().getClusterController(); clusterController.addClusterTopologyListenerForReplication(nodeLocator); - if (logger.isTraceEnabled()) { - logger.trace("Waiting on cluster connection"); - } - //todo do we actually need to wait? + logger.trace("Waiting on cluster connection"); clusterController.awaitConnectionToReplicationCluster(); - if (logger.isTraceEnabled()) { - logger.trace("Cluster Connected"); - } - clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator)); + logger.trace("Cluster Connected"); + + clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator)); // nodeManager.startBackup(); if (logger.isTraceEnabled()) { @@ -320,13 +326,19 @@ public final class SharedNothingBackupActivation extends Activation { return; } ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer); + logger.trace("stop backup"); activeMQServer.getNodeManager().stopBackup(); + logger.trace("start store manager"); activeMQServer.getStorageManager().start(); + logger.trace("activated"); activeMQServer.getBackupManager().activated(); if (scalingDown) { + logger.trace("Scalling down..."); activeMQServer.initialisePart2(true); } else { + logger.trace("Setting up new activation"); activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy())); + logger.trace("initialize part 2"); activeMQServer.initialisePart2(false); if (activeMQServer.getIdentity() != null) { @@ -337,6 +349,8 @@ public final class SharedNothingBackupActivation extends Activation { } + logger.trace("completeActivation at the end"); + activeMQServer.completeActivation(); } } catch (Exception e) { diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index 1b35393eab..a95f77a256 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -534,7 +534,11 @@ public abstract class ActiveMQTestBase extends Assert { for (String c : connectors) { connectors0.add(c); } - ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0); + ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration(). + setName("cluster1").setAddress("jms").setConnectorName(connectorName). + setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1). + setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT). + setStaticConnectors(connectors0); return clusterConnectionConfiguration; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java index f192506463..8889ec5f31 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/LargeMessageFailoverTest.java @@ -18,22 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; +import org.junit.Ignore; import org.junit.Test; public class LargeMessageFailoverTest extends FailoverTest { @Override @Test + @Ignore public void testLiveAndBackupLiveComesBackNewFactory() throws Exception { // skip test because it triggers OutOfMemoryError. - Thread.sleep(1000); } @Override @Test + @Ignore public void testLiveAndBackupBackupComesBackNewFactory() throws Exception { // skip test because it triggers OutOfMemoryError. - Thread.sleep(1000); } /** diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java index 38bf424358..383b371ca2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedMultipleServerFailoverTest.java @@ -136,7 +136,7 @@ public class ReplicatedMultipleServerFailoverTest extends MultipleServerFailover @Override public boolean isNetty() { - return false; + return true; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java index 472d32795e..abd08b82f9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/TransportConfigurationUtils.java @@ -86,7 +86,7 @@ public final class TransportConfigurationUtils { private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams); } @@ -102,7 +102,7 @@ public final class TransportConfigurationUtils { String name) { if (classname.contains("netty")) { Map serverParams = new HashMap<>(); - Integer port = live ? 61616 : 5545; + Integer port = live ? 61616 + server : 5545 + server; serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port); return new TransportConfiguration(classname, serverParams, name); }