This closes #2899
This commit is contained in:
commit
8969045f92
|
@ -139,6 +139,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
protected Transaction tx;
|
||||
|
||||
/** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
|
||||
* in a failure scenario (client is gone), this will be held between xaEnd and xaCommit. */
|
||||
protected volatile Transaction pendingTX;
|
||||
|
||||
protected boolean xa;
|
||||
|
||||
protected final PagingManager pagingManager;
|
||||
|
@ -384,13 +388,28 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
if (closed)
|
||||
return;
|
||||
|
||||
if (tx != null && tx.getXid() == null) {
|
||||
// We only rollback local txs on close, not XA tx branches
|
||||
if (failed) {
|
||||
|
||||
try {
|
||||
rollback(failed, false);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
|
||||
Transaction txToRollback = tx;
|
||||
if (txToRollback != null) {
|
||||
txToRollback.rollbackIfPossible();
|
||||
}
|
||||
|
||||
txToRollback = pendingTX;
|
||||
|
||||
if (txToRollback != null) {
|
||||
txToRollback.rollbackIfPossible();
|
||||
}
|
||||
|
||||
} else {
|
||||
if (tx != null && tx.getXid() == null) {
|
||||
// We only rollback local txs on close, not XA tx branches
|
||||
|
||||
try {
|
||||
rollback(failed, false);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.unableToRollbackOnClose(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1252,6 +1271,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
@Override
|
||||
public synchronized void xaCommit(final Xid xid, final boolean onePhase) throws Exception {
|
||||
this.pendingTX = null;
|
||||
|
||||
if (tx != null && tx.getXid().equals(xid)) {
|
||||
final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
|
||||
|
@ -1310,6 +1330,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
throw new ActiveMQXAException(XAException.XAER_PROTO, msg);
|
||||
}
|
||||
} else {
|
||||
this.pendingTX = tx;
|
||||
tx = null;
|
||||
}
|
||||
} else {
|
||||
|
@ -1395,6 +1416,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
@Override
|
||||
public synchronized void xaRollback(final Xid xid) throws Exception {
|
||||
this.pendingTX = null;
|
||||
|
||||
if (tx != null && tx.getXid().equals(xid)) {
|
||||
final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
|
||||
|
||||
|
|
|
@ -51,6 +51,11 @@ public interface Transaction {
|
|||
|
||||
void rollback() throws Exception;
|
||||
|
||||
/** In a ServerSession failure scenario,\
|
||||
* we may try to rollback, however only if it's not prepared.
|
||||
* In case it's prepared, we will just let it be and let the transaction manager to deal with it */
|
||||
void rollbackIfPossible();
|
||||
|
||||
long getID();
|
||||
|
||||
Xid getXid();
|
||||
|
|
|
@ -357,6 +357,27 @@ public class TransactionImpl implements Transaction {
|
|||
state = State.COMMITTED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackIfPossible() {
|
||||
synchronized (timeoutLock) {
|
||||
if (state == State.ROLLEDBACK) {
|
||||
// I don't think this could happen, but just in case
|
||||
logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is being ignored");
|
||||
return;
|
||||
}
|
||||
if (state != State.PREPARED) {
|
||||
try {
|
||||
internalRollback(sorted);
|
||||
} catch (Exception e) {
|
||||
// nothing we can do beyond logging
|
||||
// no need to special handler here as this was not even supposed to happen at this point
|
||||
// even if it happenes this would be the exception of the exception, so we just log here
|
||||
logger.warn(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollback() throws Exception {
|
||||
if (logger.isTraceEnabled()) {
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.xa;
|
||||
|
||||
import javax.transaction.xa.XAResource;
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ServerSession;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class SessionFailureXATest extends ActiveMQTestBase {
|
||||
|
||||
private static IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
|
||||
|
||||
private final Map<String, AddressSettings> addressSettings = new HashMap<>();
|
||||
|
||||
private ActiveMQServer messagingService;
|
||||
|
||||
private ClientSession clientSession;
|
||||
|
||||
private ClientSessionFactory sessionFactory;
|
||||
|
||||
private Configuration configuration;
|
||||
|
||||
private final SimpleString atestq = new SimpleString("BasicXaTestq");
|
||||
|
||||
private ServerLocator locator;
|
||||
|
||||
private StoreConfiguration.StoreType storeType;
|
||||
|
||||
public SessionFailureXATest(StoreConfiguration.StoreType storeType) {
|
||||
this.storeType = storeType;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name = "storeType={0}")
|
||||
public static Collection<Object[]> data() {
|
||||
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}};
|
||||
return Arrays.asList(params);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
addressSettings.clear();
|
||||
|
||||
if (storeType == StoreConfiguration.StoreType.DATABASE) {
|
||||
configuration = createDefaultJDBCConfig(true);
|
||||
} else {
|
||||
configuration = createDefaultNettyConfig();
|
||||
}
|
||||
|
||||
messagingService = createServer(true, configuration, -1, -1, addressSettings);
|
||||
|
||||
// start the server
|
||||
messagingService.start();
|
||||
|
||||
locator = createInVMNonHALocator();
|
||||
locator.setAckBatchSize(0);
|
||||
sessionFactory = createSessionFactory(locator);
|
||||
|
||||
clientSession = addClientSession(sessionFactory.createSession(true, false, false));
|
||||
|
||||
clientSession.createQueue(atestq, atestq, null, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWithXAEnd() throws Exception {
|
||||
testFailure(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWithoutXAEnd() throws Exception {
|
||||
testFailure(false);
|
||||
}
|
||||
|
||||
public void testFailure(boolean xaEnd) throws Exception {
|
||||
|
||||
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
|
||||
try {
|
||||
ClientProducer clientProducer = clientSession2.createProducer(atestq);
|
||||
ClientMessage m1 = createTextMessage(clientSession2, "m1");
|
||||
ClientMessage m2 = createTextMessage(clientSession2, "m2");
|
||||
ClientMessage m3 = createTextMessage(clientSession2, "m3");
|
||||
ClientMessage m4 = createTextMessage(clientSession2, "m4");
|
||||
clientProducer.send(m1);
|
||||
clientProducer.send(m2);
|
||||
clientProducer.send(m3);
|
||||
clientProducer.send(m4);
|
||||
} finally {
|
||||
clientSession2.close();
|
||||
}
|
||||
|
||||
Xid xid = newXID();
|
||||
clientSession.start(xid, XAResource.TMNOFLAGS);
|
||||
clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
|
||||
clientSession.start();
|
||||
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
|
||||
ClientMessage m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
|
||||
if (xaEnd) {
|
||||
// We are validating both cases, where xaEnd succeeded and didn't succeed
|
||||
// so this tests is parameterized to validate both cases.
|
||||
clientSession.end(xid, XAResource.TMSUCCESS);
|
||||
}
|
||||
|
||||
Wait.assertEquals(1, () -> messagingService.getSessions().size());
|
||||
|
||||
for (ServerSession serverSession : messagingService.getSessions()) {
|
||||
serverSession.getRemotingConnection().fail(new ActiveMQException("fail this"));
|
||||
serverSession.getRemotingConnection().disconnect(false);
|
||||
}
|
||||
|
||||
Wait.assertEquals(0, () -> messagingService.getSessions().size());
|
||||
|
||||
locator = createInVMNonHALocator();
|
||||
sessionFactory = createSessionFactory(locator);
|
||||
clientSession = addClientSession(sessionFactory.createSession(true, false, false));
|
||||
|
||||
Wait.assertEquals(1, () -> messagingService.getSessions().size());
|
||||
|
||||
xid = newXID();
|
||||
|
||||
clientSession.start(xid, XAResource.TMNOFLAGS);
|
||||
clientSession.setTransactionTimeout((int) TimeUnit.MINUTES.toMillis(10));
|
||||
clientSession.start();
|
||||
clientConsumer = clientSession.createConsumer(atestq);
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
|
||||
|
||||
}
|
||||
}
|
|
@ -139,6 +139,11 @@ public class BindingsImplTest extends ActiveMQTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackIfPossible() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit(final boolean onePhase) throws Exception {
|
||||
|
||||
|
|
Loading…
Reference in New Issue