This commit is contained in:
Clebert Suconic 2021-06-14 16:37:23 -04:00
commit f6a4c8fcde
1 changed files with 51 additions and 0 deletions

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionOutcomeUnknownException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionRolledBackException;
@ -49,6 +50,12 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
@ -59,6 +66,7 @@ import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
@ -1911,6 +1919,49 @@ public class FailoverTest extends FailoverTestBase {
receiveMessages(consumer);
}
@Test(timeout = 120000)
public void testMultipleSessionFailover() throws Exception {
final String address = "TEST";
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100);
sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session1 = createSession(sf, true, true, 0);
ClientSession session2 = createSession(sf, true, true, 0);
backupServer.addInterceptor(
new Interceptor() {
private int index = 0;
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException {
if (packet.getType() == PacketImpl.CREATESESSION) {
index++;
if (index == 2 || index == 3) {
Channel sessionChannel = ((RemotingConnectionImpl) connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id, -1);
sessionChannel.send(new ActiveMQExceptionMessage(new ActiveMQInternalErrorException()));
return false;
}
}
return true;
}
});
session1.start();
session2.start();
crash(session1, session2);
session1.createQueue(new QueueConfiguration(address).setAddress(address));
ClientProducer clientProducer = session1.createProducer(address);
clientProducer.send(session1.createMessage(false));
ClientConsumer clientConsumer = session2.createConsumer(address);
ClientMessage message = clientConsumer.receive(3000);
Assert.assertNotNull(message);
}
@Test(timeout = 120000)
public void testForceBlockingReturn() throws Exception {
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100);