ARTEMIS-3275 Lock CORE client communication during failover retries

This commit is contained in:
Domenico Francesco Bruscino 2021-06-22 09:52:48 +02:00 committed by Clebert Suconic
parent 49e3843aa7
commit 6d2b96c79e
8 changed files with 118 additions and 36 deletions

View File

@ -632,10 +632,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
connector = null;
boolean allSessionReconnected;
HashSet<ClientSessionInternal> sessionsToFailover;
synchronized (sessions) {
sessionsToFailover = new HashSet<>(sessions);
}
for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection);
}
boolean allSessionReconnected = false;
int failedReconnectSessionsCounter = 0;
do {
allSessionReconnected = reconnectSessions(oldConnection, reconnectAttempts, me);
allSessionReconnected = reconnectSessions(sessionsToFailover, oldConnection, reconnectAttempts, me);
if (oldConnection != null) {
oldConnection.destroy();
}
@ -644,10 +653,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
failedReconnectSessionsCounter++;
oldConnection = connection;
connection = null;
// Wait for retry when the connection is established but not all session are reconnected.
if ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && oldConnection != null) {
waitForRetry(retryInterval);
}
}
}
while ((reconnectAttempts == -1 || failedReconnectSessionsCounter < reconnectAttempts) && !allSessionReconnected);
for (ClientSessionInternal session : sessionsToFailover) {
session.postHandleFailover(connection, allSessionReconnected);
}
if (oldConnection != null) {
oldConnection.destroy();
}
@ -764,18 +782,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
/*
* Re-attach sessions all pre-existing sessions to the new remoting connection
*/
private boolean reconnectSessions(final RemotingConnection oldConnection,
final int reconnectAttempts,
final ActiveMQException cause) {
HashSet<ClientSessionInternal> sessionsToFailover;
synchronized (sessions) {
sessionsToFailover = new HashSet<>(sessions);
}
for (ClientSessionInternal session : sessionsToFailover) {
session.preHandleFailover(connection);
}
private boolean reconnectSessions(final Set<ClientSessionInternal> sessionsToFailover,
final RemotingConnection oldConnection,
final int reconnectAttempts,
final ActiveMQException cause) {
getConnectionWithRetry(reconnectAttempts, oldConnection);
if (connection == null) {

View File

@ -141,6 +141,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
private volatile boolean mayAttemptToFailover = true;
private volatile boolean resetCreditManager = false;
/**
* Current XID. this will be used in case of failover
*/
@ -1387,8 +1389,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return true;
}
boolean resetCreditManager = false;
try {
// TODO remove this and encapsulate it
@ -1463,30 +1463,42 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);
suc = false;
} finally {
sessionContext.releaseCommunications();
}
if (resetCreditManager) {
synchronized (producerCreditManager) {
producerCreditManager.reset();
}
// Also need to send more credits for consumers, otherwise the system could hand with the server
// not having any credits to send
}
}
HashMap<String, String> metaDataToSend;
synchronized (metadata) {
metaDataToSend = new HashMap<>(metadata);
}
sessionContext.resetMetadata(metaDataToSend);
return suc;
}
@Override
public void postHandleFailover(RemotingConnection connection, boolean successful) {
sessionContext.releaseCommunications();
if (successful) {
synchronized (this) {
if (closed) {
return;
}
if (resetCreditManager) {
synchronized (producerCreditManager) {
producerCreditManager.reset();
}
resetCreditManager = false;
// Also need to send more credits for consumers, otherwise the system could hand with the server
// not having any credits to send
}
}
HashMap<String, String> metaDataToSend;
synchronized (metadata) {
metaDataToSend = new HashMap<>(metadata);
}
sessionContext.resetMetadata(metaDataToSend);
}
}
@Override

View File

@ -68,6 +68,8 @@ public interface ClientSessionInternal extends ClientSession {
boolean handleFailover(RemotingConnection backupConnection, ActiveMQException cause);
void postHandleFailover(RemotingConnection connection, boolean successful);
RemotingConnection getConnection();
void cleanUp(boolean failingOver) throws ActiveMQException;

View File

@ -184,6 +184,13 @@ public interface Channel {
*/
int getLastConfirmedCommandID();
/**
* queries if this channel is locked. This method is designed for use in monitoring of the system state, not for synchronization control.
*
* @return true it the channel is locked and false otherwise
*/
boolean isLocked();
/**
* locks the channel.
* <p>

View File

@ -139,7 +139,7 @@ public class ActiveMQSessionContext extends SessionContext {
private String name;
private boolean killed;
protected Channel getSessionChannel() {
public Channel getSessionChannel() {
return sessionChannel;
}

View File

@ -689,6 +689,11 @@ public final class ChannelImpl implements Channel {
}
}
@Override
public boolean isLocked() {
return failingOver;
}
@Override
public void lock() {
if (logger.isTraceEnabled()) {

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -50,8 +51,10 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
@ -1971,6 +1974,44 @@ public class FailoverTest extends FailoverTestBase {
Assert.assertNotNull(message);
}
@Test(timeout = 120000)
public void testChannelStateDuringFailover() throws Exception {
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100);
sf = createSessionFactoryAndWaitForTopology(locator, 2);
final AtomicBoolean channelLockedDuringFailover = new AtomicBoolean(false);
ClientSession session = createSession(sf, true, true, 0);
backupServer.addInterceptor(
new Interceptor() {
private int index = 0;
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (index < 1 && packet.getType() == PacketImpl.CREATESESSION) {
sf.getConnection().addCloseListener(() -> {
index++;
ActiveMQSessionContext sessionContext = (ActiveMQSessionContext)((ClientSessionInternal)session).getSessionContext();
channelLockedDuringFailover.set(sessionContext.getSessionChannel().isLocked());
});
Channel sessionChannel = ((RemotingConnectionImpl)connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id, -1);
sessionChannel.send(new ActiveMQExceptionMessage(new ActiveMQInternalErrorException()));
return false;
}
return true;
}
});
session.start();
crash(session);
Assert.assertTrue(channelLockedDuringFailover.get());
}
@Test(timeout = 120000)
public void testForceBlockingReturn() throws Exception {
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100);

View File

@ -300,6 +300,11 @@ public class BackupSyncDelay implements Interceptor {
throw new UnsupportedOperationException();
}
@Override
public boolean isLocked() {
return false;
}
@Override
public void lock() {
throw new UnsupportedOperationException();