ARTEMIS-2559 Connection failure should rollback pending XA TX

This commit is contained in:
Clebert Suconic 2019-11-21 15:13:38 -05:00
parent 2bf2dba8d3
commit 91cbbb8698
5 changed files with 260 additions and 6 deletions

View File

@ -139,6 +139,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected Transaction tx;
/** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
* in a failure scenario (client is gone), this will be held between xaEnd and xaCommit. */
protected volatile Transaction pendingTX;
protected boolean xa;
protected final PagingManager pagingManager;
@ -384,6 +388,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (closed)
return;
if (failed) {
Transaction txToRollback = tx;
if (txToRollback != null) {
txToRollback.rollbackIfPossible();
}
txToRollback = pendingTX;
if (txToRollback != null) {
txToRollback.rollbackIfPossible();
}
} else {
if (tx != null && tx.getXid() == null) {
// We only rollback local txs on close, not XA tx branches
@ -394,6 +412,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
}
}
//putting closing of consumers outside the sync block
//https://issues.jboss.org/browse/HORNETQ-1141
@ -1252,6 +1271,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception {
this.pendingTX = null;
if (tx != null && tx.getXid().equals(xid)) {
final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
@ -1310,6 +1330,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
}
} else {
this.pendingTX = tx;
tx = null;
}
} else {
@ -1395,6 +1416,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public synchronized void xaRollback(final Xid xid) throws Exception {
this.pendingTX = null;
if (tx != null && tx.getXid().equals(xid)) {
final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();

View File

@ -51,6 +51,11 @@ public interface Transaction {
void rollback() throws Exception;
/** In a ServerSession failure scenario,\
* we may try to rollback, however only if it's not prepared.
* In case it's prepared, we will just let it be and let the transaction manager to deal with it */
void rollbackIfPossible();
long getID();
Xid getXid();

View File

@ -357,6 +357,27 @@ public class TransactionImpl implements Transaction {
state = State.COMMITTED;
}
@Override
public void rollbackIfPossible() {
synchronized (timeoutLock) {
if (state == State.ROLLEDBACK) {
// I don't think this could happen, but just in case
logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is being ignored");
return;
}
if (state != State.PREPARED) {
try {
internalRollback(sorted);
} catch (Exception e) {
// nothing we can do beyond logging
// no need to special handler here as this was not even supposed to happen at this point
// even if it happenes this would be the exception of the exception, so we just log here
logger.warn(e.getMessage(), e);
}
}
}
}
@Override
public void rollback() throws Exception {
if (logger.isTraceEnabled()) {

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.xa;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class SessionFailureXATest extends ActiveMQTestBase {
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
private ActiveMQServer messagingService;
private ClientSession clientSession;
private ClientSessionFactory sessionFactory;
private Configuration configuration;
private final SimpleString atestq = new SimpleString("BasicXaTestq");
private ServerLocator locator;
private StoreConfiguration.StoreType storeType;
public SessionFailureXATest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@Parameterized.Parameters(name = "storeType={0}")
public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}};
return Arrays.asList(params);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
addressSettings.clear();
if (storeType == StoreConfiguration.StoreType.DATABASE) {
configuration = createDefaultJDBCConfig(true);
} else {
configuration = createDefaultNettyConfig();
}
messagingService = createServer(true, configuration, -1, -1, addressSettings);
// start the server
messagingService.start();
locator = createInVMNonHALocator();
locator.setAckBatchSize(0);
sessionFactory = createSessionFactory(locator);
clientSession = addClientSession(sessionFactory.createSession(true, false, false));
clientSession.createQueue(atestq, atestq, null, true);
}
@Test
public void testFailureWithXAEnd() throws Exception {
testFailure(true);
}
@Test
public void testFailureWithoutXAEnd() throws Exception {
testFailure(false);
}
public void testFailure(boolean xaEnd) throws Exception {
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
try {
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
ClientMessage m2 = createTextMessage(clientSession2, "m2");
ClientMessage m3 = createTextMessage(clientSession2, "m3");
ClientMessage m4 = createTextMessage(clientSession2, "m4");
clientProducer.send(m1);
clientProducer.send(m2);
clientProducer.send(m3);
clientProducer.send(m4);
} finally {
clientSession2.close();
}
Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
if (xaEnd) {
// We are validating both cases, where xaEnd succeeded and didn't succeed
// so this tests is parameterized to validate both cases.
clientSession.end(xid, XAResource.TMSUCCESS);
}
Wait.assertEquals(1, () -> messagingService.getSessions().size());
for (ServerSession serverSession : messagingService.getSessions()) {
serverSession.getRemotingConnection().fail(new ActiveMQException("fail this"));
serverSession.getRemotingConnection().disconnect(false);
}
Wait.assertEquals(0, () -> messagingService.getSessions().size());
locator = createInVMNonHALocator();
sessionFactory = createSessionFactory(locator);
clientSession = addClientSession(sessionFactory.createSession(true, false, false));
Wait.assertEquals(1, () -> messagingService.getSessions().size());
xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
clientSession.start();
clientConsumer = clientSession.createConsumer(atestq);
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
}
}

View File

@ -139,6 +139,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
}
@Override
public void rollbackIfPossible() {
}
@Override
public void commit(final boolean onePhase) throws Exception {