This closes #2611
This commit is contained in:
commit
e796c24585
|
@ -255,8 +255,13 @@ public enum ActiveMQExceptionType {
|
|||
public ActiveMQException createException(String msg) {
|
||||
return new ActiveMQShutdownException(msg);
|
||||
}
|
||||
},
|
||||
REPLICATION_TIMEOUT_ERROR(220) {
|
||||
@Override
|
||||
public ActiveMQException createException(String msg) {
|
||||
return new ActiveMQReplicationTimeooutException(msg);
|
||||
}
|
||||
};
|
||||
|
||||
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
|
||||
|
||||
static {
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.api.core;
|
||||
|
||||
/**
|
||||
* The creation of a session was rejected by the server (e.g. if the server is starting and has not
|
||||
* finish to be initialized.
|
||||
*/
|
||||
public final class ActiveMQReplicationTimeooutException extends ActiveMQException {
|
||||
|
||||
private static final long serialVersionUID = -4486139158452585899L;
|
||||
|
||||
public ActiveMQReplicationTimeooutException() {
|
||||
super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR);
|
||||
}
|
||||
|
||||
public ActiveMQReplicationTimeooutException(String msg) {
|
||||
super(ActiveMQExceptionType.REPLICATION_TIMEOUT_ERROR, msg);
|
||||
}
|
||||
}
|
|
@ -187,6 +187,14 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
|
|||
channels.put(channelID, channel);
|
||||
}
|
||||
|
||||
public List<Interceptor> getIncomingInterceptors() {
|
||||
return incomingInterceptors;
|
||||
}
|
||||
|
||||
public List<Interceptor> getOutgoingInterceptors() {
|
||||
return outgoingInterceptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {
|
||||
synchronized (failLock) {
|
||||
|
|
|
@ -185,7 +185,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
|
|||
|
||||
protected boolean journalLoaded = false;
|
||||
|
||||
private final IOCriticalErrorListener ioCriticalErrorListener;
|
||||
protected final IOCriticalErrorListener ioCriticalErrorListener;
|
||||
|
||||
protected final Configuration config;
|
||||
|
||||
|
|
|
@ -669,7 +669,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
storageManagerLock.writeLock().lock();
|
||||
try {
|
||||
if (replicator != null) {
|
||||
replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout);
|
||||
replicator.sendSynchronizationDone(nodeID, initialReplicationSyncTimeout, ioCriticalErrorListener);
|
||||
performCachedLargeMessageDeletes();
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -23,13 +23,16 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.FileChannel;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
|
@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.protocol.core.Channel;
|
|||
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
|
||||
|
@ -124,6 +128,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
|
||||
private Executor executor;
|
||||
|
||||
private List<Interceptor> outgoingInterceptors = null;
|
||||
|
||||
|
||||
// Constructors --------------------------------------------------
|
||||
public ReplicationEndpoint(final ActiveMQServerImpl server,
|
||||
IOCriticalErrorListener criticalErrorListener,
|
||||
|
@ -150,6 +157,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
journals[id] = journal;
|
||||
}
|
||||
|
||||
public void addOutgoingInterceptorForReplication(Interceptor interceptor) {
|
||||
if (outgoingInterceptors == null) {
|
||||
outgoingInterceptors = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
outgoingInterceptors.add(interceptor);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is for tests basically, do not use it as its API is not guaranteed for future usage.
|
||||
*/
|
||||
|
@ -229,12 +243,16 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
logger.trace("Returning " + response);
|
||||
}
|
||||
|
||||
channel.send(response);
|
||||
sendResponse(response);
|
||||
} else {
|
||||
logger.trace("Response is null, ignoring response");
|
||||
}
|
||||
}
|
||||
|
||||
protected void sendResponse(PacketImpl response) {
|
||||
channel.send(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param packet
|
||||
*/
|
||||
|
@ -348,6 +366,20 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
|
||||
public void setChannel(final Channel channel) {
|
||||
this.channel = channel;
|
||||
|
||||
if (this.channel != null && outgoingInterceptors != null) {
|
||||
if (channel.getConnection() instanceof RemotingConnectionImpl) {
|
||||
try {
|
||||
RemotingConnectionImpl impl = (RemotingConnectionImpl) channel.getConnection();
|
||||
for (Interceptor interceptor : outgoingInterceptors) {
|
||||
impl.getOutgoingInterceptors().add(interceptor);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// This is code for embedded or testing, it should not affect server's semantics in case of error
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void finishSynchronization(String liveID) throws Exception {
|
||||
|
@ -511,11 +543,6 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
final JournalContent journalContent = SyncDataType.getJournalContentType(packet.getDataType());
|
||||
final Journal journal = journalsHolder.get(journalContent);
|
||||
|
||||
if (packet.getNodeID() != null) {
|
||||
// At the start of replication, we still do not know which is the nodeID that the live uses.
|
||||
// This is the point where the backup gets this information.
|
||||
backupQuorum.liveIDSet(packet.getNodeID());
|
||||
}
|
||||
Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(journalContent);
|
||||
|
||||
for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet()) {
|
||||
|
@ -523,6 +550,18 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
|
|||
}
|
||||
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
|
||||
registerJournal(journalContent.typeByte, syncJournal);
|
||||
|
||||
// We send a response now, to avoid a situation where we handle votes during the deactivation of the live during a failback.
|
||||
sendResponse(replicationResponseMessage);
|
||||
replicationResponseMessage = null;
|
||||
|
||||
// This needs to be done after the response is sent, to avoid voting shutting it down for any reason.
|
||||
if (packet.getNodeID() != null) {
|
||||
// At the start of replication, we still do not know which is the nodeID that the live uses.
|
||||
// This is the point where the backup gets this information.
|
||||
backupQuorum.liveIDSet(packet.getNodeID());
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
throw ActiveMQMessageBundle.BUNDLE.replicationUnhandledDataType();
|
||||
|
|
|
@ -33,9 +33,11 @@ import io.netty.buffer.PooledByteBufAllocator;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.journal.EncodingSupport;
|
||||
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
|
||||
|
@ -69,7 +71,10 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.Replicatio
|
|||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
@ -107,6 +112,8 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
}
|
||||
}
|
||||
|
||||
private final ActiveMQServer server;
|
||||
|
||||
private final ResponseHandler responseHandler = new ResponseHandler();
|
||||
|
||||
private final Channel replicatingChannel;
|
||||
|
@ -136,10 +143,12 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
/**
|
||||
* @param remotingConnection
|
||||
*/
|
||||
public ReplicationManager(CoreRemotingConnection remotingConnection,
|
||||
public ReplicationManager(ActiveMQServer server,
|
||||
CoreRemotingConnection remotingConnection,
|
||||
final long timeout,
|
||||
final long initialReplicationSyncTimeout,
|
||||
final ExecutorFactory ioExecutorFactory) {
|
||||
this.server = server;
|
||||
this.ioExecutorFactory = ioExecutorFactory;
|
||||
this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
|
||||
this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
|
||||
|
@ -631,7 +640,7 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
*
|
||||
* @param nodeID
|
||||
*/
|
||||
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
|
||||
public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout, IOCriticalErrorListener criticalErrorListener) throws ActiveMQReplicationTimeooutException {
|
||||
if (enabled) {
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
@ -642,8 +651,25 @@ public final class ReplicationManager implements ActiveMQComponent {
|
|||
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
|
||||
try {
|
||||
if (!synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
|
||||
ActiveMQReplicationTimeooutException exception = ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
||||
|
||||
if (server != null) {
|
||||
try {
|
||||
ClusterManager clusterManager = server.getClusterManager();
|
||||
if (clusterManager != null) {
|
||||
QuorumManager manager = clusterManager.getQuorumManager();
|
||||
if (criticalErrorListener != null && manager != null && manager.getMaxClusterSize() <= 2) {
|
||||
criticalErrorListener.onIOException(exception, exception.getMessage(), null);
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// if NPE or anything else, continue as nothing changed
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
logger.trace("sendSynchronizationDone wasn't finished in time");
|
||||
throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
|
||||
throw exception;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
logger.debug(e);
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseExce
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
|
||||
|
@ -370,7 +371,7 @@ public interface ActiveMQMessageBundle {
|
|||
IllegalArgumentException invalidMessageLoadBalancingType(String val);
|
||||
|
||||
@Message(id = 229114, value = "Replication synchronization process timed out after waiting {0} milliseconds", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalStateException replicationSynchronizationTimeout(long timeout);
|
||||
ActiveMQReplicationTimeooutException replicationSynchronizationTimeout(long timeout);
|
||||
|
||||
@Message(id = 229115, value = "Colocated Policy hasn''t different type live and backup", format = Message.Format.MESSAGE_FORMAT)
|
||||
ActiveMQIllegalStateException liveBackupMismatch();
|
||||
|
|
|
@ -325,6 +325,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
private CriticalAnalyzer analyzer;
|
||||
|
||||
// This is a callback to be called right before an activation is created
|
||||
private Runnable afterActivationCreated;
|
||||
|
||||
//todo think about moving this to the activation
|
||||
private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>();
|
||||
|
||||
|
@ -459,6 +462,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
// life-cycle methods
|
||||
// ----------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* A Callback for tests
|
||||
* @return
|
||||
*/
|
||||
public Runnable getAfterActivationCreated() {
|
||||
return afterActivationCreated;
|
||||
}
|
||||
|
||||
/**
|
||||
* A Callback for tests
|
||||
* @param afterActivationCreated
|
||||
* @return
|
||||
*/
|
||||
public ActiveMQServerImpl setAfterActivationCreated(Runnable afterActivationCreated) {
|
||||
this.afterActivationCreated = afterActivationCreated;
|
||||
return this;
|
||||
}
|
||||
|
||||
/*
|
||||
* Can be overridden for tests
|
||||
*/
|
||||
|
@ -560,6 +581,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
if (!haPolicy.isBackup()) {
|
||||
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
|
||||
|
||||
if (afterActivationCreated != null) {
|
||||
try {
|
||||
afterActivationCreated.run();
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e); // just debug, this is not supposed to happend, and if it does
|
||||
}
|
||||
|
||||
afterActivationCreated = null;
|
||||
}
|
||||
|
||||
if (haPolicy.isWaitForActivation()) {
|
||||
activation.run();
|
||||
} else {
|
||||
|
@ -579,6 +610,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
|
||||
}
|
||||
|
||||
if (afterActivationCreated != null) {
|
||||
try {
|
||||
afterActivationCreated.run();
|
||||
} catch (Throwable e) {
|
||||
logger.warn(e.getMessage(), e); // just debug, this is not supposed to happend, and if it does
|
||||
// it will be embedeed code from tests
|
||||
}
|
||||
afterActivationCreated = null;
|
||||
}
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("starting backupActivation");
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
|
|||
ReplicationFailureListener listener = new ReplicationFailureListener();
|
||||
rc.addCloseListener(listener);
|
||||
rc.addFailureListener(listener);
|
||||
replicationManager = new ReplicationManager(rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
|
||||
replicationManager = new ReplicationManager(activeMQServer, rc, clusterConnection.getCallTimeout(), replicatedPolicy.getInitialReplicationSyncTimeout(), activeMQServer.getIOExecutorFactory());
|
||||
replicationManager.start();
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.cluster.failover;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Packet;
|
||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
|
||||
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
|
||||
import org.apache.activemq.artemis.junit.Wait;
|
||||
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
|
||||
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ReplicaTimeoutTest extends ActiveMQTestBase {
|
||||
|
||||
protected ServerLocator locator;
|
||||
|
||||
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(getConnectorTransportConfiguration(true), getConnectorTransportConfiguration(false))).setRetryInterval(50);
|
||||
}
|
||||
|
||||
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
|
||||
return TransportConfigurationUtils.getInVMAcceptor(live);
|
||||
}
|
||||
|
||||
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
|
||||
return TransportConfigurationUtils.getInVMConnector(live);
|
||||
}
|
||||
|
||||
protected NodeManager createReplicatedBackupNodeManager(Configuration backupConfig) {
|
||||
return new InVMNodeManager(true, backupConfig.getJournalLocation());
|
||||
}
|
||||
|
||||
protected TestableServer createTestableServer(Configuration config, NodeManager nodeManager) throws Exception {
|
||||
boolean isBackup = config.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration || config.getHAPolicyConfiguration() instanceof SharedStoreSlavePolicyConfiguration;
|
||||
return new SameProcessActiveMQServer(createInVMFailoverServer(true, config, nodeManager, isBackup ? 2 : 1));
|
||||
}
|
||||
|
||||
protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
|
||||
int topologyMembers) throws Exception {
|
||||
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);
|
||||
|
||||
locator.addClusterTopologyListener(new FailoverTestBase.LatchClusterTopologyListener(countDownLatch));
|
||||
|
||||
ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
|
||||
addSessionFactory(sf);
|
||||
|
||||
Assert.assertTrue("topology members expected " + topologyMembers, countDownLatch.await(5, TimeUnit.SECONDS));
|
||||
return sf;
|
||||
}
|
||||
|
||||
protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
|
||||
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(300).setRetryInterval(100);
|
||||
|
||||
return createSessionFactoryAndWaitForTopology(locator, 2);
|
||||
}
|
||||
|
||||
protected ClientSession createSession(ClientSessionFactory sf1,
|
||||
boolean autoCommitSends,
|
||||
boolean autoCommitAcks) throws Exception {
|
||||
return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
|
||||
}
|
||||
|
||||
protected void crash(TestableServer liveServer,
|
||||
TestableServer backupServer,
|
||||
ClientSession... sessions) throws Exception {
|
||||
if (sessions.length > 0) {
|
||||
for (ClientSession session : sessions) {
|
||||
waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer());
|
||||
}
|
||||
} else {
|
||||
waitForRemoteBackup(null, 5, true, backupServer.getServer());
|
||||
}
|
||||
liveServer.crash(true, true, sessions);
|
||||
}
|
||||
|
||||
@Test//(timeout = 120000)
|
||||
public void testFailbackTimeout() throws Exception {
|
||||
AssertionLoggerHandler.startCapture();
|
||||
try {
|
||||
TestableServer backupServer = null;
|
||||
TestableServer liveServer = null;
|
||||
ClientSessionFactory sf = null;
|
||||
try {
|
||||
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
|
||||
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
|
||||
final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
|
||||
|
||||
Configuration backupConfig = createDefaultInVMConfig();
|
||||
Configuration liveConfig = createDefaultInVMConfig();
|
||||
|
||||
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
|
||||
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000);
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setInitialReplicationSyncTimeout(1000);
|
||||
|
||||
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).
|
||||
setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
|
||||
liveConfig.setBindingsDirectory(getBindingsDir(0, false)).setJournalDirectory(getJournalDir(0, false)).
|
||||
setPagingDirectory(getPageDir(0, false)).setLargeMessagesDirectory(getLargeMessagesDir(0, false)).setSecurityEnabled(false);
|
||||
|
||||
((ReplicatedPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true);
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true);
|
||||
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
|
||||
|
||||
NodeManager nodeManager = createReplicatedBackupNodeManager(backupConfig);
|
||||
|
||||
backupServer = createTestableServer(backupConfig, nodeManager);
|
||||
|
||||
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
|
||||
|
||||
liveServer = createTestableServer(liveConfig, nodeManager);
|
||||
|
||||
AtomicBoolean ignoreIntercept = new AtomicBoolean(false);
|
||||
|
||||
final TestableServer theBackup = backupServer;
|
||||
|
||||
liveServer.start();
|
||||
backupServer.start();
|
||||
|
||||
Wait.assertTrue(backupServer.getServer()::isReplicaSync);
|
||||
|
||||
sf = createSessionFactory();
|
||||
|
||||
ClientSession session = createSession(sf, true, true);
|
||||
|
||||
session.createQueue(ADDRESS, ADDRESS, null, true);
|
||||
|
||||
crash(liveServer, backupServer, session);
|
||||
|
||||
Wait.assertTrue(backupServer.getServer()::isActive);
|
||||
|
||||
ignoreIntercept.set(true);
|
||||
|
||||
((ActiveMQServerImpl) backupServer.getServer()).setAfterActivationCreated(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
//theBackup.getServer().getActivation()
|
||||
|
||||
SharedNothingBackupActivation activation = (SharedNothingBackupActivation) theBackup.getServer().getActivation();
|
||||
activation.getReplicationEndpoint().addOutgoingInterceptorForReplication(new Interceptor() {
|
||||
@Override
|
||||
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
|
||||
if (ignoreIntercept.get() && packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
liveServer.start();
|
||||
|
||||
Assert.assertTrue(Wait.waitFor(() -> AssertionLoggerHandler.findText("AMQ229114")));
|
||||
|
||||
Wait.assertFalse(liveServer.getServer()::isStarted);
|
||||
|
||||
} finally {
|
||||
if (sf != null) {
|
||||
sf.close();
|
||||
}
|
||||
try {
|
||||
liveServer.getServer().stop();
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
try {
|
||||
backupServer.getServer().stop();
|
||||
} catch (Throwable ignored) {
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
AssertionLoggerHandler.stopCapture();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -191,7 +191,7 @@ public final class ReplicationTest extends ActiveMQTestBase {
|
|||
setupServer(false);
|
||||
try {
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
manager = new ReplicationManager((CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
|
||||
manager = new ReplicationManager(null, (CoreRemotingConnection) sf.getConnection(), sf.getServerLocator().getCallTimeout(), sf.getServerLocator().getCallTimeout(), factory);
|
||||
addActiveMQComponent(manager);
|
||||
manager.start();
|
||||
Assert.fail("Exception was expected");
|
||||
|
|
Loading…
Reference in New Issue