mirror of https://github.com/apache/activemq.git
resolve https://issues.apache.org/activemq/browse/AMQ-2556 - resolve leaks with XA_RDONLY - prepare needs to cleanup the transaction state, both on the broker and on the client connection/session/failover state
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@930135 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c22ea7fab0
commit
4eafcccf62
|
@ -426,6 +426,18 @@ public class TransactionContext implements XAResource {
|
||||||
|
|
||||||
// Find out if the server wants to commit or rollback.
|
// Find out if the server wants to commit or rollback.
|
||||||
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
|
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
|
||||||
|
if (XAResource.XA_RDONLY == response.getResult()) {
|
||||||
|
// transaction stops now, may be syncs that need a callback
|
||||||
|
List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
|
||||||
|
if (l != null && !l.isEmpty()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
|
||||||
|
}
|
||||||
|
for (TransactionContext ctx : l) {
|
||||||
|
ctx.afterCommit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return response.getResult();
|
return response.getResult();
|
||||||
|
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
|
|
|
@ -31,6 +31,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import javax.transaction.xa.XAResource;
|
||||||
|
|
||||||
import org.apache.activemq.broker.ft.MasterBroker;
|
import org.apache.activemq.broker.ft.MasterBroker;
|
||||||
import org.apache.activemq.broker.region.ConnectionStatistics;
|
import org.apache.activemq.broker.region.ConnectionStatistics;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
|
@ -389,7 +392,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
|
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
|
||||||
if (transactionState == null) {
|
if (transactionState == null) {
|
||||||
throw new IllegalStateException("Cannot prepare a transaction that had not been started: "
|
throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
|
||||||
+ info.getTransactionId());
|
+ info.getTransactionId());
|
||||||
}
|
}
|
||||||
// Avoid dups.
|
// Avoid dups.
|
||||||
|
@ -397,6 +400,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
transactionState.setPrepared(true);
|
transactionState.setPrepared(true);
|
||||||
int result = broker.prepareTransaction(context, info.getTransactionId());
|
int result = broker.prepareTransaction(context, info.getTransactionId());
|
||||||
transactionState.setPreparedResult(result);
|
transactionState.setPreparedResult(result);
|
||||||
|
if (result == XAResource.XA_RDONLY) {
|
||||||
|
// we are done, no further rollback or commit from TM
|
||||||
|
cs.removeTransactionState(info.getTransactionId());
|
||||||
|
}
|
||||||
IntegerResponse response = new IntegerResponse(result);
|
IntegerResponse response = new IntegerResponse(result);
|
||||||
return response;
|
return response;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import javax.jms.TransactionRolledBackException;
|
import javax.jms.TransactionRolledBackException;
|
||||||
|
import javax.transaction.xa.XAResource;
|
||||||
|
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.ConnectionId;
|
import org.apache.activemq.command.ConnectionId;
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.activemq.command.ConsumerId;
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
import org.apache.activemq.command.DestinationInfo;
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
import org.apache.activemq.command.ExceptionResponse;
|
import org.apache.activemq.command.ExceptionResponse;
|
||||||
|
import org.apache.activemq.command.IntegerResponse;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.command.ProducerId;
|
import org.apache.activemq.command.ProducerId;
|
||||||
|
@ -79,20 +81,35 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private class RemoveTransactionAction implements Runnable {
|
private class RemoveTransactionAction implements ResponseHandler {
|
||||||
private final TransactionInfo info;
|
private final TransactionInfo info;
|
||||||
|
|
||||||
public RemoveTransactionAction(TransactionInfo info) {
|
public RemoveTransactionAction(TransactionInfo info) {
|
||||||
this.info = info;
|
this.info = info;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void onResponse(Command response) {
|
||||||
ConnectionId connectionId = info.getConnectionId();
|
ConnectionId connectionId = info.getConnectionId();
|
||||||
ConnectionState cs = connectionStates.get(connectionId);
|
ConnectionState cs = connectionStates.get(connectionId);
|
||||||
cs.removeTransactionState(info.getTransactionId());
|
cs.removeTransactionState(info.getTransactionId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
|
||||||
|
|
||||||
|
public PrepareReadonlyTransactionAction(TransactionInfo info) {
|
||||||
|
super(info);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onResponse(Command command) {
|
||||||
|
IntegerResponse response = (IntegerResponse) command;
|
||||||
|
if (XAResource.XA_RDONLY == response.getResult()) {
|
||||||
|
// all done, no commit or rollback from TM
|
||||||
|
super.onResponse(command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
|
@ -469,10 +486,10 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
|
||||||
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
|
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
|
||||||
if (transactionState != null) {
|
if (transactionState != null) {
|
||||||
transactionState.addCommand(info);
|
transactionState.addCommand(info);
|
||||||
|
return new Tracked(new PrepareReadonlyTransactionAction(info));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return TRACKED_RESPONSE_MARKER;
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.state;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.Command;
|
||||||
|
|
||||||
|
public interface ResponseHandler {
|
||||||
|
public void onResponse(Command command);
|
||||||
|
}
|
|
@ -16,25 +16,26 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.state;
|
package org.apache.activemq.state;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
|
|
||||||
public class Tracked extends Response {
|
public class Tracked extends Response {
|
||||||
|
|
||||||
private Runnable runnable;
|
private ResponseHandler handler;
|
||||||
|
|
||||||
public Tracked(Runnable runnable) {
|
public Tracked(ResponseHandler runnable) {
|
||||||
this.runnable = runnable;
|
this.handler = runnable;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onResponses() {
|
public void onResponses(Command command) {
|
||||||
if (runnable != null) {
|
if (handler != null) {
|
||||||
runnable.run();
|
handler.onResponse(command);
|
||||||
runnable = null;
|
handler = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isWaitingForResponse() {
|
public boolean isWaitingForResponse() {
|
||||||
return runnable != null;
|
return handler != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
|
object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
|
||||||
}
|
}
|
||||||
if (object != null && object.getClass() == Tracked.class) {
|
if (object != null && object.getClass() == Tracked.class) {
|
||||||
((Tracked) object).onResponses();
|
((Tracked) object).onResponses(command);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!initialized) {
|
if (!initialized) {
|
||||||
|
@ -1011,6 +1011,10 @@ public class FailoverTransport implements CompositeTransport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ConnectionStateTracker getStateTracker() {
|
||||||
|
return stateTracker;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean contains(URI newURI) {
|
private boolean contains(URI newURI) {
|
||||||
|
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
|
@ -37,11 +38,20 @@ import javax.transaction.xa.XAException;
|
||||||
import javax.transaction.xa.XAResource;
|
import javax.transaction.xa.XAResource;
|
||||||
import javax.transaction.xa.Xid;
|
import javax.transaction.xa.Xid;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
import org.apache.activemq.broker.BrokerRegistry;
|
import org.apache.activemq.broker.BrokerRegistry;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransactionBroker;
|
||||||
|
import org.apache.activemq.broker.TransportConnection;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.activemq.command.ConnectionId;
|
||||||
|
import org.apache.activemq.command.TransactionInfo;
|
||||||
|
import org.apache.activemq.command.XATransactionId;
|
||||||
|
import org.apache.activemq.management.JMSConnectionStatsImpl;
|
||||||
|
import org.apache.activemq.transport.Transport;
|
||||||
|
import org.apache.activemq.transport.failover.FailoverTransport;
|
||||||
import org.apache.activemq.transport.stomp.StompTransportFilter;
|
import org.apache.activemq.transport.stomp.StompTransportFilter;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -249,6 +259,88 @@ public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testReadonlyNoLeak() throws Exception {
|
||||||
|
final String brokerName = "readOnlyNoLeak";
|
||||||
|
BrokerService broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/" + brokerName));
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.start();
|
||||||
|
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
|
||||||
|
cf1.setStatsEnabled(true);
|
||||||
|
ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection();
|
||||||
|
xaConnection.start();
|
||||||
|
XASession session = xaConnection.createXASession();
|
||||||
|
XAResource resource = session.getXAResource();
|
||||||
|
Xid tid = createXid();
|
||||||
|
resource.start(tid, XAResource.TMNOFLAGS);
|
||||||
|
session.close();
|
||||||
|
resource.end(tid, XAResource.TMSUCCESS);
|
||||||
|
resource.commit(tid, true);
|
||||||
|
|
||||||
|
assertTransactionGoneFromBroker(tid);
|
||||||
|
assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
|
||||||
|
assertSessionGone(xaConnection, session);
|
||||||
|
assertTransactionGoneFromFailoverState(xaConnection, tid);
|
||||||
|
|
||||||
|
// two phase
|
||||||
|
session = xaConnection.createXASession();
|
||||||
|
resource = session.getXAResource();
|
||||||
|
tid = createXid();
|
||||||
|
resource.start(tid, XAResource.TMNOFLAGS);
|
||||||
|
session.close();
|
||||||
|
resource.end(tid, XAResource.TMSUCCESS);
|
||||||
|
assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));
|
||||||
|
|
||||||
|
// no need for a commit on read only
|
||||||
|
assertTransactionGoneFromBroker(tid);
|
||||||
|
assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
|
||||||
|
assertSessionGone(xaConnection, session);
|
||||||
|
assertTransactionGoneFromFailoverState(xaConnection, tid);
|
||||||
|
|
||||||
|
xaConnection.close();
|
||||||
|
broker.stop();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTransactionGoneFromFailoverState(
|
||||||
|
ActiveMQXAConnection connection1, Xid tid) throws Exception {
|
||||||
|
|
||||||
|
FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
|
||||||
|
TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
|
||||||
|
assertNull("transaction shold not exist in the state tracker",
|
||||||
|
transport.getStateTracker().processCommitTransactionOnePhase(info));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertSessionGone(ActiveMQXAConnection connection1,
|
||||||
|
XASession session) {
|
||||||
|
JMSConnectionStatsImpl stats = (JMSConnectionStatsImpl)connection1.getStats();
|
||||||
|
// should be no dangling sessions maintained by the transaction
|
||||||
|
assertEquals("should be no sessions", 0, stats.getSessions().length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTransactionGoneFromConnection(String brokerName, String clientId, ConnectionId connectionId, Xid tid) throws Exception {
|
||||||
|
BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName);
|
||||||
|
CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections();
|
||||||
|
for (TransportConnection connection: connections) {
|
||||||
|
if (connection.getConnectionId().equals(clientId)) {
|
||||||
|
try {
|
||||||
|
connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
|
||||||
|
fail("did not get expected excepton on missing transaction, it must be still there in error!");
|
||||||
|
} catch (IllegalStateException expectedOnNoTransaction) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertTransactionGoneFromBroker(Xid tid) throws Exception {
|
||||||
|
BrokerService broker = BrokerRegistry.getInstance().lookup("localhost");
|
||||||
|
TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
|
||||||
|
try {
|
||||||
|
transactionBroker.getTransaction(null, new XATransactionId(tid), false);
|
||||||
|
fail("expecte ex on tx not found");
|
||||||
|
} catch (XAException expectedOnNotFound) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertCreateConnection(String uri) throws Exception {
|
protected void assertCreateConnection(String uri) throws Exception {
|
||||||
// Start up a broker with a tcp connector.
|
// Start up a broker with a tcp connector.
|
||||||
BrokerService broker = new BrokerService();
|
BrokerService broker = new BrokerService();
|
||||||
|
|
Loading…
Reference in New Issue