This closes #1382
This commit is contained in:
commit
89a53ad015
|
@ -374,7 +374,7 @@ public interface ActiveMQClientLogger extends BasicLogger {
|
|||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214013, value = "Failed to decode packet", format = Message.Format.MESSAGE_FORMAT)
|
||||
void errorDecodingPacket(@Cause Exception e);
|
||||
void errorDecodingPacket(@Cause Throwable e);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214014, value = "Failed to execute failure listener", format = Message.Format.MESSAGE_FORMAT)
|
||||
|
|
|
@ -462,6 +462,10 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
@Override
|
||||
public void setHandler(final ChannelHandler handler) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Setting handler on " + this + " as " + handler);
|
||||
}
|
||||
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
|
@ -521,6 +525,9 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
@Override
|
||||
public void lock() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("lock channel " + this);
|
||||
}
|
||||
lock.lock();
|
||||
|
||||
reconnectID.incrementAndGet();
|
||||
|
@ -532,6 +539,9 @@ public final class ChannelImpl implements Channel {
|
|||
|
||||
@Override
|
||||
public void unlock() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("unlock channel " + this);
|
||||
}
|
||||
lock.lock();
|
||||
|
||||
failingOver = false;
|
||||
|
|
|
@ -363,7 +363,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
doBufferReceived(packet);
|
||||
|
||||
super.bufferReceived(connectionID, buffer);
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
ActiveMQClientLogger.LOGGER.errorDecodingPacket(e);
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
|
|
|
@ -38,8 +38,9 @@ public final class ReplicationResponseMessageV2 extends ReplicationResponseMessa
|
|||
return synchronizationIsFinishedAcknowledgement;
|
||||
}
|
||||
|
||||
public void setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
|
||||
public ReplicationResponseMessageV2 setSynchronizationIsFinishedAcknowledgement(boolean synchronizationIsFinishedAcknowledgement) {
|
||||
this.synchronizationIsFinishedAcknowledgement = synchronizationIsFinishedAcknowledgement;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -210,7 +210,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
ActiveMQServerLogger.LOGGER.errorHandlingReplicationPacket(e, packet);
|
||||
response = new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e));
|
||||
}
|
||||
channel.send(response);
|
||||
|
||||
if (response != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Returning " + response);
|
||||
}
|
||||
|
||||
channel.send(response);
|
||||
} else {
|
||||
logger.trace("Response is null, ignoring response");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -332,34 +341,68 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
|
||||
private void finishSynchronization(String liveID) throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("finishSynchronization::" + liveID);
|
||||
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
|
||||
}
|
||||
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
|
||||
Journal journal = journalsHolder.remove(jc);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("getting lock on " + jc + ", journal = " + journal);
|
||||
}
|
||||
registerJournal(jc.typeByte, journal);
|
||||
journal.synchronizationLock();
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("lock acquired on " + jc);
|
||||
}
|
||||
// files should be already in place.
|
||||
filesReservedForSync.remove(jc);
|
||||
registerJournal(jc.typeByte, journal);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("stopping journal for " + jc);
|
||||
}
|
||||
journal.stop();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("starting journal for " + jc);
|
||||
}
|
||||
journal.start();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("loadAndSync " + jc);
|
||||
}
|
||||
journal.loadSyncOnly(JournalState.SYNCING_UP_TO_DATE);
|
||||
} finally {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("unlocking " + jc);
|
||||
}
|
||||
journal.synchronizationUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Sync on large messages...");
|
||||
}
|
||||
ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
|
||||
for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet()) {
|
||||
ReplicatedLargeMessage lm = entry.getValue();
|
||||
if (lm instanceof LargeServerMessageInSync) {
|
||||
LargeServerMessageInSync lmSync = (LargeServerMessageInSync) lm;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("lmSync on " + lmSync.toString());
|
||||
}
|
||||
lmSync.joinSyncedData(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("setRemoteBackupUpToDate and liveIDSet for " + liveID);
|
||||
}
|
||||
|
||||
journalsHolder = null;
|
||||
backupQuorum.liveIDSet(liveID);
|
||||
activation.setRemoteBackupUpToDate();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
|
||||
}
|
||||
|
||||
ActiveMQServerLogger.LOGGER.backupServerSynched(server);
|
||||
return;
|
||||
}
|
||||
|
@ -428,13 +471,28 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("handleStartReplicationSynchronization:: nodeID = " + packet);
|
||||
}
|
||||
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
|
||||
if (!started)
|
||||
return replicationResponseMessage;
|
||||
|
||||
if (packet.isSynchronizationFinished()) {
|
||||
finishSynchronization(packet.getNodeID());
|
||||
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
// this is a long running process, we cannot block the reading thread from netty
|
||||
finishSynchronization(packet.getNodeID());
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("returning completion on synchronization catchup");
|
||||
}
|
||||
channel.send(new ReplicationResponseMessageV2().setSynchronizationIsFinishedAcknowledgement(true));
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage());
|
||||
channel.send(new ActiveMQExceptionMessage(ActiveMQMessageBundle.BUNDLE.replicationUnhandledError(e)));
|
||||
}
|
||||
|
||||
});
|
||||
// the write will happen through an executor
|
||||
return null;
|
||||
}
|
||||
|
||||
ReplicationResponseMessageV2 replicationResponseMessage = new ReplicationResponseMessageV2();
|
||||
if (!started) {
|
||||
return replicationResponseMessage;
|
||||
}
|
||||
|
||||
|
|
|
@ -356,15 +356,16 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
if (enabled) {
|
||||
pendingTokens.add(repliToken);
|
||||
if (useExecutor) {
|
||||
replicationStream.execute(() -> {
|
||||
if (enabled) {
|
||||
pendingTokens.add(repliToken);
|
||||
flowControl(packet.expectedEncodeSize());
|
||||
replicatingChannel.send(packet);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
pendingTokens.add(repliToken);
|
||||
flowControl(packet.expectedEncodeSize());
|
||||
replicatingChannel.send(packet);
|
||||
}
|
||||
|
@ -411,9 +412,9 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
OperationContext ctx = pendingTokens.poll();
|
||||
|
||||
if (ctx == null) {
|
||||
throw new IllegalStateException("Missing replication token on the queue.");
|
||||
logger.warn("Missing replication token on queue");
|
||||
return;
|
||||
}
|
||||
|
||||
ctx.replicationDone();
|
||||
}
|
||||
|
||||
|
|
|
@ -22,10 +22,10 @@ import org.apache.activemq.artemis.api.core.Interceptor;
|
|||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.LiveNodeLocator;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Stops the backup in case of an error at the start of Replication.
|
||||
|
@ -36,11 +36,11 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
|||
*/
|
||||
final class ReplicationError implements Interceptor {
|
||||
|
||||
private final ActiveMQServer server;
|
||||
private static final Logger logger = Logger.getLogger(ReplicationError.class);
|
||||
|
||||
private LiveNodeLocator nodeLocator;
|
||||
|
||||
ReplicationError(ActiveMQServer server, LiveNodeLocator nodeLocator) {
|
||||
this.server = server;
|
||||
ReplicationError(LiveNodeLocator nodeLocator) {
|
||||
this.nodeLocator = nodeLocator;
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,10 @@ final class ReplicationError implements Interceptor {
|
|||
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
|
||||
if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
|
||||
return true;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Received ReplicationError::" + packet);
|
||||
}
|
||||
BackupReplicationStartFailedMessage message = (BackupReplicationStartFailedMessage) packet;
|
||||
switch (message.getRegistrationProblem()) {
|
||||
case ALREADY_REPLICATING:
|
||||
|
|
|
@ -101,6 +101,8 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
logger.trace("SharedNothingBackupActivation..start");
|
||||
synchronized (activeMQServer) {
|
||||
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
|
||||
}
|
||||
|
@ -109,16 +111,24 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
activeMQServer.moveServerData(replicaPolicy.getMaxSavedReplicatedJournalsSize());
|
||||
activeMQServer.getNodeManager().start();
|
||||
synchronized (this) {
|
||||
if (closed)
|
||||
if (closed) {
|
||||
logger.trace("SharedNothingBackupActivation is closed, ignoring activation!");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
boolean scalingDown = replicaPolicy.getScaleDownPolicy() != null && replicaPolicy.getScaleDownPolicy().isEnabled();
|
||||
|
||||
if (!activeMQServer.initialisePart1(scalingDown))
|
||||
if (!activeMQServer.initialisePart1(scalingDown)) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("could not initialize part1 " + scalingDown);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
logger.trace("Waiting for a synchronize now...");
|
||||
synchronized (this) {
|
||||
logger.trace("Entered a synchronized");
|
||||
if (closed)
|
||||
return;
|
||||
backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize());
|
||||
|
@ -136,16 +146,12 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
ClusterController clusterController = activeMQServer.getClusterManager().getClusterController();
|
||||
clusterController.addClusterTopologyListenerForReplication(nodeLocator);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Waiting on cluster connection");
|
||||
}
|
||||
//todo do we actually need to wait?
|
||||
logger.trace("Waiting on cluster connection");
|
||||
clusterController.awaitConnectionToReplicationCluster();
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Cluster Connected");
|
||||
}
|
||||
clusterController.addIncomingInterceptorForReplication(new ReplicationError(activeMQServer, nodeLocator));
|
||||
logger.trace("Cluster Connected");
|
||||
|
||||
clusterController.addIncomingInterceptorForReplication(new ReplicationError(nodeLocator));
|
||||
|
||||
// nodeManager.startBackup();
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -320,13 +326,19 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
return;
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
|
||||
logger.trace("stop backup");
|
||||
activeMQServer.getNodeManager().stopBackup();
|
||||
logger.trace("start store manager");
|
||||
activeMQServer.getStorageManager().start();
|
||||
logger.trace("activated");
|
||||
activeMQServer.getBackupManager().activated();
|
||||
if (scalingDown) {
|
||||
logger.trace("Scalling down...");
|
||||
activeMQServer.initialisePart2(true);
|
||||
} else {
|
||||
logger.trace("Setting up new activation");
|
||||
activeMQServer.setActivation(new SharedNothingLiveActivation(activeMQServer, replicaPolicy.getReplicatedPolicy()));
|
||||
logger.trace("initialize part 2");
|
||||
activeMQServer.initialisePart2(false);
|
||||
|
||||
if (activeMQServer.getIdentity() != null) {
|
||||
|
@ -337,6 +349,8 @@ public final class SharedNothingBackupActivation extends Activation {
|
|||
|
||||
}
|
||||
|
||||
logger.trace("completeActivation at the end");
|
||||
|
||||
activeMQServer.completeActivation();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -534,7 +534,11 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
for (String c : connectors) {
|
||||
connectors0.add(c);
|
||||
}
|
||||
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().setName("cluster1").setAddress("jms").setConnectorName(connectorName).setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setStaticConnectors(connectors0);
|
||||
ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
|
||||
setName("cluster1").setAddress("jms").setConnectorName(connectorName).
|
||||
setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
|
||||
setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
|
||||
setStaticConnectors(connectors0);
|
||||
|
||||
return clusterConnectionConfiguration;
|
||||
}
|
||||
|
|
|
@ -18,22 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
public class LargeMessageFailoverTest extends FailoverTest {
|
||||
|
||||
@Override
|
||||
@Test
|
||||
@Ignore
|
||||
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception {
|
||||
// skip test because it triggers OutOfMemoryError.
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Test
|
||||
@Ignore
|
||||
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception {
|
||||
// skip test because it triggers OutOfMemoryError.
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -136,7 +136,7 @@ public class ReplicatedMultipleServerFailoverTest extends MultipleServerFailover
|
|||
|
||||
@Override
|
||||
public boolean isNetty() {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -86,7 +86,7 @@ public final class TransportConfigurationUtils {
|
|||
private static TransportConfiguration transportConfiguration(String classname, boolean live, int server) {
|
||||
if (classname.contains("netty")) {
|
||||
Map<String, Object> serverParams = new HashMap<>();
|
||||
Integer port = live ? 61616 : 5545;
|
||||
Integer port = live ? 61616 + server : 5545 + server;
|
||||
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
|
||||
return new TransportConfiguration(classname, serverParams);
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ public final class TransportConfigurationUtils {
|
|||
String name) {
|
||||
if (classname.contains("netty")) {
|
||||
Map<String, Object> serverParams = new HashMap<>();
|
||||
Integer port = live ? 61616 : 5545;
|
||||
Integer port = live ? 61616 + server : 5545 + server;
|
||||
serverParams.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
|
||||
return new TransportConfiguration(classname, serverParams, name);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue