ARTEMIS-2297 Avoiding Split Brains during replication catch up when no quorum is established
This commit is contained in:
parent
4a878b8bfa
commit
720f60ace2
|
@ -255,8 +255,13 @@ public enum ActiveMQExceptionType {
|
||||||
public ActiveMQException createException(String msg) {
|
public ActiveMQException createException(String msg) {
|
||||||
return new ActiveMQShutdownException(msg);
|
return new ActiveMQShutdownException(msg);
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
REPLICATION_TIMEOUT_ERROR(220) {
|
||||||
|
@Override
|
||||||
|
public ActiveMQException createException(String msg) {
|
||||||
|
return new ActiveMQReplicationTimeooutException(msg);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
|
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.api.core;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The creation of a session was rejected by the server (e.g. if the server is starting and has not
|
||||||
|
* finish to be initialized.
|
||||||
|
*/
|
||||||
|
public final class ActiveMQReplicationTimeooutException extends ActiveMQException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -4486139158452585899L;
|
||||||
|
|
||||||
|
public ActiveMQReplicationTimeooutException() {
|
||||||
|
super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQReplicationTimeooutException(String msg) {
|
||||||
|
super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR, msg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -187,6 +187,14 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
||||||
channels.put(channelID, channel);
|
channels.put(channelID, channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<Interceptor> getIncomingInterceptors() {
|
||||||
|
return incomingInterceptors;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Interceptor> getOutgoingInterceptors() {
|
||||||
|
return outgoingInterceptors;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
|
public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
|
||||||
synchronized (failLock) {
|
synchronized (failLock) {
|
||||||
|
|
|
@ -185,7 +185,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
||||||
|
|
||||||
protected boolean journalLoaded = false;
|
protected boolean journalLoaded = false;
|
||||||
|
|
||||||
private final IOCriticalErrorListener ioCriticalErrorListener;
|
protected final IOCriticalErrorListener ioCriticalErrorListener;
|
||||||
|
|
||||||
protected final Configuration config;
|
protected final Configuration config;
|
||||||
|
|
||||||
|
|
|
@ -669,7 +669,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
storageManagerLock.writeLock().lock();
|
storageManagerLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
if (replicator != null) {
|
if (replicator != null) {
|
||||||
replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout);
|
replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout, ioCriticalErrorListener);
|
||||||
performCachedLargeMessageDeletes();
|
performCachedLargeMessageDeletes();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -23,13 +23,16 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
import org.apache.activemq.artemis.api.core.Message;
|
import org.apache.activemq.artemis.api.core.Message;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.config.Configuration;
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
|
||||||
|
@ -124,6 +128,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
|
|
||||||
private Executor executor;
|
private Executor executor;
|
||||||
|
|
||||||
|
private List<Interceptor> outgoingInterceptors = null;
|
||||||
|
|
||||||
|
|
||||||
// Constructors --------------------------------------------------
|
// Constructors --------------------------------------------------
|
||||||
public ReplicationEndpoint(final ActiveMQServerImpl server,
|
public ReplicationEndpoint(final ActiveMQServerImpl server,
|
||||||
IOCriticalErrorListener criticalErrorListener,
|
IOCriticalErrorListener criticalErrorListener,
|
||||||
|
@ -150,6 +157,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
journals[id] = journal;
|
journals[id] = journal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void addOutgoingInterceptorForReplication(Interceptor interceptor) {
|
||||||
|
if (outgoingInterceptors == null) {
|
||||||
|
outgoingInterceptors = new CopyOnWriteArrayList<>();
|
||||||
|
}
|
||||||
|
outgoingInterceptors.add(interceptor);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
||||||
*/
|
*/
|
||||||
|
@ -229,12 +243,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
logger.trace("Returning " + response);
|
logger.trace("Returning " + response);
|
||||||
}
|
}
|
||||||
|
|
||||||
channel.send(response);
|
sendResponse(response);
|
||||||
} else {
|
} else {
|
||||||
logger.trace("Response is null, ignoring response");
|
logger.trace("Response is null, ignoring response");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void sendResponse(PacketImpl response) {
|
||||||
|
channel.send(response);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param packet
|
* @param packet
|
||||||
*/
|
*/
|
||||||
|
@ -348,6 +366,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
|
|
||||||
public void setChannel(final Channel channel) {
|
public void setChannel(final Channel channel) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
|
||||||
|
if (this.channel != null && outgoingInterceptors != null) {
|
||||||
|
if (channel.getConnection() instanceof RemotingConnectionImpl) {
|
||||||
|
try {
|
||||||
|
RemotingConnectionImpl impl = (RemotingConnectionImpl) channel.getConnection();
|
||||||
|
for (Interceptor interceptor : outgoingInterceptors) {
|
||||||
|
impl.getOutgoingInterceptors().add(interceptor);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// This is code for embedded or testing, it should not affect server's semantics in case of error
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void finishSynchronization(String liveID) throws Exception {
|
private synchronized void finishSynchronization(String liveID) throws Exception {
|
||||||
|
@ -511,11 +543,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
|
final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
|
||||||
final Journal journal = journalsHolder.get(journalContent);
|
final Journal journal = journalsHolder.get(journalContent);
|
||||||
|
|
||||||
if (packet.getNodeID() != null) {
|
|
||||||
// At the start of replication, we still do not know which is the nodeID that the live uses.
|
|
||||||
// This is the point where the backup gets this information.
|
|
||||||
backupQuorum.liveIDSet(packet.getNodeID());
|
|
||||||
}
|
|
||||||
Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
|
Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
|
||||||
|
|
||||||
for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
|
for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
|
||||||
|
@ -523,6 +550,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
||||||
}
|
}
|
||||||
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
|
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
|
||||||
registerJournal(journalContent.typeByte, syncJournal);
|
registerJournal(journalContent.typeByte, syncJournal);
|
||||||
|
|
||||||
|
// We send a response now, to avoid a situation where we handle votes during the deactivation of the live during a failback.
|
||||||
|
sendResponse(replicationResponseMessage);
|
||||||
|
replicationResponseMessage = null;
|
||||||
|
|
||||||
|
// This needs to be done after the response is sent, to avoid voting shutting it down for any reason.
|
||||||
|
if (packet.getNodeID() != null) {
|
||||||
|
// At the start of replication, we still do not know which is the nodeID that the live uses.
|
||||||
|
// This is the point where the backup gets this information.
|
||||||
|
backupQuorum.liveIDSet(packet.getNodeID());
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
|
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
|
||||||
|
|
|
@ -33,9 +33,11 @@ import io.netty.buffer.PooledByteBufAllocator;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
|
||||||
import org.apache.activemq.artemis.api.core.Pair;
|
import org.apache.activemq.artemis.api.core.Pair;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||||
|
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||||
|
@ -69,7 +71,10 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
|
@ -107,6 +112,8 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final ActiveMQServer server;
|
||||||
|
|
||||||
private final ResponseHandler responseHandler = new ResponseHandler();
|
private final ResponseHandler responseHandler = new ResponseHandler();
|
||||||
|
|
||||||
private final Channel replicatingChannel;
|
private final Channel replicatingChannel;
|
||||||
|
@ -136,10 +143,12 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
/**
|
/**
|
||||||
* @param remotingConnection
|
* @param remotingConnection
|
||||||
*/
|
*/
|
||||||
public ReplicationManager(CoreRemotingConnection remotingConnection,
|
public ReplicationManager(ActiveMQServer server,
|
||||||
|
CoreRemotingConnection remotingConnection,
|
||||||
final long timeout,
|
final long timeout,
|
||||||
final long initialReplicationSyncTimeout,
|
final long initialReplicationSyncTimeout,
|
||||||
final ExecutorFactory ioExecutorFactory) {
|
final ExecutorFactory ioExecutorFactory) {
|
||||||
|
this.server = server;
|
||||||
this.ioExecutorFactory = ioExecutorFactory;
|
this.ioExecutorFactory = ioExecutorFactory;
|
||||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||||
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
||||||
|
@ -631,7 +640,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
*
|
*
|
||||||
* @param nodeID
|
* @param nodeID
|
||||||
*/
|
*/
|
||||||
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
|
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) throws ActiveMQReplicationTimeooutException {
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
|
@ -642,8 +651,25 @@ public final class ReplicationManager implements ActiveMQComponent {
|
||||||
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
|
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
|
||||||
try {
|
try {
|
||||||
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
|
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
|
||||||
|
ActiveMQReplicationTimeooutException exception = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
||||||
|
|
||||||
|
if (server != null) {
|
||||||
|
try {
|
||||||
|
ClusterManager clusterManager = server.getClusterManager();
|
||||||
|
if (clusterManager != null) {
|
||||||
|
QuorumManager manager = clusterManager.getQuorumManager();
|
||||||
|
if (criticalErrorListener != null && manager != null && manager.getMaxClusterSize() <= 2) {
|
||||||
|
criticalErrorListener.onIOException(exception, exception.getMessage(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
// if NPE or anything else, continue as nothing changed
|
||||||
|
logger.warn(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
logger.trace("sendSynchronizationDone wasn't finished in time");
|
logger.trace("sendSynchronizationDone wasn't finished in time");
|
||||||
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
throw exception;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
logger.debug(e);
|
logger.debug(e);
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseExce
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
|
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
|
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
|
import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
|
||||||
|
@ -370,7 +371,7 @@ public interface ActiveMQMessageBundle {
|
||||||
IllegalArgumentException invalidMessageLoadBalancingType(String val);
|
IllegalArgumentException invalidMessageLoadBalancingType(String val);
|
||||||
|
|
||||||
@Message(id = 229114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 229114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
|
||||||
IllegalStateException replicationSynchronizationTimeout(long timeout);
|
ActiveMQReplicationTimeooutException replicationSynchronizationTimeout(long timeout);
|
||||||
|
|
||||||
@Message(id = 229115, value = "Colocated Policy hasn''t different type live and backup", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 229115, value = "Colocated Policy hasn''t different type live and backup", format = Message.Format.MESSAGE_FORMAT)
|
||||||
ActiveMQIllegalStateException liveBackupMismatch();
|
ActiveMQIllegalStateException liveBackupMismatch();
|
||||||
|
|
|
@ -325,6 +325,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
|
|
||||||
private CriticalAnalyzer analyzer;
|
private CriticalAnalyzer analyzer;
|
||||||
|
|
||||||
|
// This is a callback to be called right before an activation is created
|
||||||
|
private Runnable afterActivationCreated;
|
||||||
|
|
||||||
//todo think about moving this to the activation
|
//todo think about moving this to the activation
|
||||||
private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
|
private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -459,6 +462,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
// life-cycle methods
|
// life-cycle methods
|
||||||
// ----------------------------------------------------------------
|
// ----------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Callback for tests
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public Runnable getAfterActivationCreated() {
|
||||||
|
return afterActivationCreated;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Callback for tests
|
||||||
|
* @param afterActivationCreated
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public ActiveMQServerImpl setAfterActivationCreated(Runnable afterActivationCreated) {
|
||||||
|
this.afterActivationCreated = afterActivationCreated;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Can be overridden for tests
|
* Can be overridden for tests
|
||||||
*/
|
*/
|
||||||
|
@ -560,6 +581,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
if (!haPolicy.isBackup()) {
|
if (!haPolicy.isBackup()) {
|
||||||
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
|
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
|
||||||
|
|
||||||
|
if (afterActivationCreated != null) {
|
||||||
|
try {
|
||||||
|
afterActivationCreated.run();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn(e.getMessage(), e); // just debug, this is not supposed to happend, and if it does
|
||||||
|
}
|
||||||
|
|
||||||
|
afterActivationCreated = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (haPolicy.isWaitForActivation()) {
|
if (haPolicy.isWaitForActivation()) {
|
||||||
activation.run();
|
activation.run();
|
||||||
} else {
|
} else {
|
||||||
|
@ -579,6 +610,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
||||||
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
|
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (afterActivationCreated != null) {
|
||||||
|
try {
|
||||||
|
afterActivationCreated.run();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
logger.warn(e.getMessage(), e); // just debug, this is not supposed to happend, and if it does
|
||||||
|
// it will be embedeed code from tests
|
||||||
|
}
|
||||||
|
afterActivationCreated = null;
|
||||||
|
}
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("starting backupActivation");
|
logger.trace("starting backupActivation");
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
||||||
ReplicationFailureListener listener = new ReplicationFailureListener();
|
ReplicationFailureListener listener = new ReplicationFailureListener();
|
||||||
rc.addCloseListener(listener);
|
rc.addCloseListener(listener);
|
||||||
rc.addFailureListener(listener);
|
rc.addFailureListener(listener);
|
||||||
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
|
replicationManager = new ReplicationManager(activeMQServer, rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
|
||||||
replicationManager.start();
|
replicationManager.start();
|
||||||
Thread t = new Thread(new Runnable() {
|
Thread t = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,220 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
|
import org.apache.activemq.artemis.core.config.Configuration;
|
||||||
|
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||||
|
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||||
|
import org.apache.activemq.artemis.junit.Wait;
|
||||||
|
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
|
||||||
|
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class ReplicaTimeoutTest extends ActiveMQTestBase {
|
||||||
|
|
||||||
|
protected ServerLocator locator;
|
||||||
|
|
||||||
|
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false))).setRetryInterval(50);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||||
|
return TransportConfigurationUtils.getInVMAcceptor(live);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
|
||||||
|
return TransportConfigurationUtils.getInVMConnector(live);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
|
||||||
|
return new InVMNodeManager(true, backupConfig.getJournalLocation());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TestableServer createTestableServer(Configuration config, NodeManager nodeManager) throws Exception {
|
||||||
|
boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
|
||||||
|
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
|
||||||
|
int topologyMembers) throws Exception {
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
|
||||||
|
|
||||||
|
locator.addClusterTopologyListener(new FailoverTestBase.LatchClusterTopologyListener(countDownLatch));
|
||||||
|
|
||||||
|
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
|
||||||
|
addSessionFactory(sf);
|
||||||
|
|
||||||
|
Assert.assertTrue("topology members expected " + topologyMembers, countDownLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
return sf;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
|
||||||
|
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(300).setRetryInterval(100);
|
||||||
|
|
||||||
|
return createSessionFactoryAndWaitForTopology(locator, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ClientSession createSession(ClientSessionFactory sf1,
|
||||||
|
boolean autoCommitSends,
|
||||||
|
boolean autoCommitAcks) throws Exception {
|
||||||
|
return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void crash(TestableServer liveServer,
|
||||||
|
TestableServer backupServer,
|
||||||
|
ClientSession... sessions) throws Exception {
|
||||||
|
if (sessions.length > 0) {
|
||||||
|
for (ClientSession session : sessions) {
|
||||||
|
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
waitForRemoteBackup(null, 5, true, backupServer.getServer());
|
||||||
|
}
|
||||||
|
liveServer.crash(true, true, sessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test//(timeout = 120000)
|
||||||
|
public void testFailbackTimeout() throws Exception {
|
||||||
|
AssertionLoggerHandler.startCapture();
|
||||||
|
try {
|
||||||
|
TestableServer backupServer = null;
|
||||||
|
TestableServer liveServer = null;
|
||||||
|
ClientSessionFactory sf = null;
|
||||||
|
try {
|
||||||
|
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||||
|
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||||
|
final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
|
||||||
|
|
||||||
|
Configuration backupConfig = createDefaultInVMConfig();
|
||||||
|
Configuration liveConfig = createDefaultInVMConfig();
|
||||||
|
|
||||||
|
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
|
||||||
|
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000);
|
||||||
|
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000);
|
||||||
|
|
||||||
|
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).
|
||||||
|
setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
|
||||||
|
liveConfig.setBindingsDirectory(getBindingsDir(0, false)).setJournalDirectory(getJournalDir(0, false)).
|
||||||
|
setPagingDirectory(getPageDir(0, false)).setLargeMessagesDirectory(getLargeMessagesDir(0, false)).setSecurityEnabled(false);
|
||||||
|
|
||||||
|
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
|
||||||
|
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
|
||||||
|
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
|
||||||
|
|
||||||
|
NodeManager nodeManager = createReplicatedBackupNodeManager(backupConfig);
|
||||||
|
|
||||||
|
backupServer = createTestableServer(backupConfig, nodeManager);
|
||||||
|
|
||||||
|
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
|
||||||
|
|
||||||
|
liveServer = createTestableServer(liveConfig, nodeManager);
|
||||||
|
|
||||||
|
AtomicBoolean ignoreIntercept = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
final TestableServer theBackup = backupServer;
|
||||||
|
|
||||||
|
liveServer.start();
|
||||||
|
backupServer.start();
|
||||||
|
|
||||||
|
Wait.assertTrue(backupServer.getServer()::isReplicaSync);
|
||||||
|
|
||||||
|
sf = createSessionFactory();
|
||||||
|
|
||||||
|
ClientSession session = createSession(sf, true, true);
|
||||||
|
|
||||||
|
session.createQueue(ADDRESS, ADDRESS, null, true);
|
||||||
|
|
||||||
|
crash(liveServer, backupServer, session);
|
||||||
|
|
||||||
|
Wait.assertTrue(backupServer.getServer()::isActive);
|
||||||
|
|
||||||
|
ignoreIntercept.set(true);
|
||||||
|
|
||||||
|
((ActiveMQServerImpl) backupServer.getServer()).setAfterActivationCreated(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
//theBackup.getServer().getActivation()
|
||||||
|
|
||||||
|
SharedNothingBackupActivation activation = (SharedNothingBackupActivation) theBackup.getServer().getActivation();
|
||||||
|
activation.getReplicationEndpoint().addOutgoingInterceptorForReplication(new Interceptor() {
|
||||||
|
@Override
|
||||||
|
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
|
||||||
|
if (ignoreIntercept.get() && packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
liveServer.start();
|
||||||
|
|
||||||
|
Assert.assertTrue(Wait.waitFor(() -> AssertionLoggerHandler.findText("AMQ229114")));
|
||||||
|
|
||||||
|
Wait.assertFalse(liveServer.getServer()::isStarted);
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (sf != null) {
|
||||||
|
sf.close();
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
liveServer.getServer().stop();
|
||||||
|
} catch (Throwable ignored) {
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
backupServer.getServer().stop();
|
||||||
|
} catch (Throwable ignored) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
AssertionLoggerHandler.stopCapture();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
||||||
setupServer(false);
|
setupServer(false);
|
||||||
try {
|
try {
|
||||||
ClientSessionFactory sf = createSessionFactory(locator);
|
ClientSessionFactory sf = createSessionFactory(locator);
|
||||||
manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
|
manager = new ReplicationManager(null, (CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
|
||||||
addActiveMQComponent(manager);
|
addActiveMQComponent(manager);
|
||||||
manager.start();
|
manager.start();
|
||||||
Assert.fail("Exception was expected");
|
Assert.fail("Exception was expected");
|
||||||
|
|
Loading…
Reference in New Issue