From 0a12bcb928f151b5ace9a0cffc34ec717b6a8e9c Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 27 Nov 2015 12:20:12 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-2191 https://issues.apache.org/jira/browse/AMQ-3529 - rework fixes to remove uncertanty from dealing with intettuptedexception. Sync requests will trap interrupts that ocurr while waiting for responses and fail the connection with an interruptedioexception. Interrupts pending before requests will be suppressed, allowing possible clean shutdown. It is not safe to replay openwire ops b/c they are not idempotent, the only safe option is to have a teardown of the broker side state from a close --- .../activemq/transport/vm/VMTransport.java | 1 + .../apache/activemq/ActiveMQConnection.java | 8 - .../activemq/ActiveMQMessageConsumer.java | 6 - .../org/apache/activemq/ActiveMQSession.java | 4 - .../apache/activemq/TransactionContext.java | 76 ++--- .../activemq/transport/FutureResponse.java | 40 ++- .../transport/ResponseCorrelator.java | 4 +- .../TransportDisposedIOException.java | 4 + .../transport/WireFormatNegotiator.java | 19 +- .../transport/failover/FailoverTransport.java | 2 +- .../transport/fanout/FanoutTransport.java | 2 +- .../apache/activemq/util/ThreadPoolUtils.java | 6 +- .../transport/http/HttpClientTransport.java | 1 + .../ActiveMQXAConnectionTxInterruptTest.java | 275 ++++++++++++++++++ .../apache/activemq/bugs/AMQ3529v2Test.java | 248 ++++++++++++++++ .../vm/VMTransportThreadSafeTest.java | 2 +- 16 files changed, 616 insertions(+), 82 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 92c9c5178e..9e13cf9502 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -122,6 +122,7 @@ public class VMTransport implements Transport, Task { } } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); iioe.initCause(e); throw iioe; diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index aecece175b..3b2833d9a5 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -629,12 +629,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon */ @Override public void close() throws JMSException { - // Store the interrupted state and clear so that cleanup happens without - // leaking connection resources. Reset in finally to preserve state. - boolean interrupted = Thread.interrupted(); - try { - // If we were running, lets stop first. if (!closed.get() && !transportFailed.get()) { // do not fail if already closed as according to JMS spec we must not @@ -722,9 +717,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon ServiceSupport.dispose(this.transport); factoryStats.removeConnection(this); - if (interrupted) { - Thread.currentThread().interrupt(); - } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index edc383f896..a67022bdca 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -726,17 +726,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } void doClose() throws JMSException { - // Store interrupted state and clear so that Transport operations don't - // throw InterruptedException and we ensure that resources are cleaned up. - boolean interrupted = Thread.interrupted(); dispose(); RemoveInfo removeCommand = info.createRemoveCommand(); LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); this.session.asyncSendPacket(removeCommand); - if (interrupted) { - Thread.currentThread().interrupt(); - } } void inProgressClearRequired() { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 5c7cc4f2bf..6603a2f82f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -660,14 +660,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } private void doClose() throws JMSException { - boolean interrupted = Thread.interrupted(); dispose(); RemoveInfo removeCommand = info.createRemoveCommand(); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); connection.asyncSendPacket(removeCommand); - if (interrupted) { - Thread.currentThread().interrupt(); - } } final AtomicInteger clearRequestsCounter = new AtomicInteger(0); diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index 27cb49ded4..6bd7402550 100755 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -16,7 +16,6 @@ */ package org.apache.activemq; -import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -29,13 +28,11 @@ import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; -import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.DataArrayResponse; import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.IntegerResponse; import org.apache.activemq.command.LocalTransactionId; -import org.apache.activemq.command.Response; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.XATransactionId; @@ -330,7 +327,7 @@ public class TransactionContext implements XAResource { this.transactionId = null; // Notify the listener that the tx was committed back try { - syncSendPacketWithInterruptionHandling(info); + this.connection.syncSendPacket(info); if (localTransactionEventListener != null) { localTransactionEventListener.commitEvent(); } @@ -403,32 +400,36 @@ public class TransactionContext implements XAResource { if (!equals(associatedXid, xid)) { throw new XAException(XAException.XAER_PROTO); } - - // TODO: we may want to put the xid in a suspended list. - try { - beforeEnd(); - } catch (JMSException e) { - throw toXAException(e); - } finally { - setXid(null); - } + invokeBeforeEnd(); } else if ((flags & TMSUCCESS) == TMSUCCESS) { // set to null if this is the current xid. // otherwise this could be an asynchronous success call if (equals(associatedXid, xid)) { - try { - beforeEnd(); - } catch (JMSException e) { - throw toXAException(e); - } finally { - setXid(null); - } + invokeBeforeEnd(); } } else { throw new XAException(XAException.XAER_INVAL); } } + private void invokeBeforeEnd() throws XAException { + boolean throwingException = false; + try { + beforeEnd(); + } catch (JMSException e) { + throwingException = true; + throw toXAException(e); + } finally { + try { + setXid(null); + } catch (XAException ignoreIfWillMask){ + if (!throwingException) { + throw ignoreIfWillMask; + } + } + } + } + private boolean equals(Xid xid1, Xid xid2) { if (xid1 == xid2) { return true; @@ -465,7 +466,7 @@ public class TransactionContext implements XAResource { TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); // Find out if the server wants to commit or rollback. - IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); + IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); if (XAResource.XA_RDONLY == response.getResult()) { // transaction stops now, may be syncs that need a callback List l; @@ -534,7 +535,7 @@ public class TransactionContext implements XAResource { // Let the server know that the tx is rollback. TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); - syncSendPacketWithInterruptionHandling(info); + this.connection.syncSendPacket(info); List l; synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { @@ -581,7 +582,7 @@ public class TransactionContext implements XAResource { // Notify the server that the tx was committed back TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); - syncSendPacketWithInterruptionHandling(info); + this.connection.syncSendPacket(info); List l; synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { @@ -643,7 +644,7 @@ public class TransactionContext implements XAResource { try { // Tell the server to forget the transaction. - syncSendPacketWithInterruptionHandling(info); + this.connection.syncSendPacket(info); } catch (JMSException e) { throw toXAException(e); } @@ -741,7 +742,7 @@ public class TransactionContext implements XAResource { if (transactionId != null) { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); try { - syncSendPacketWithInterruptionHandling(info); + this.connection.syncSendPacket(info); LOG.debug("{} ended XA transaction {}", this, transactionId); } catch (JMSException e) { disassociate(); @@ -773,31 +774,6 @@ public class TransactionContext implements XAResource { transactionId = null; } - /** - * Sends the given command. Also sends the command in case of interruption, - * so that important commands like rollback and commit are never interrupted. - * If interruption occurred, set the interruption state of the current - * after performing the action again. - * - * @return the response - */ - private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException { - try { - return this.connection.syncSendPacket(command); - } catch (JMSException e) { - if (e.getLinkedException() instanceof InterruptedIOException) { - try { - Thread.interrupted(); - return this.connection.syncSendPacket(command); - } finally { - Thread.currentThread().interrupt(); - } - } - - throw e; - } - } - /** * Converts a JMSException from the server to an XAException. if the * JMSException contained a linked XAException that is returned instead. diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java b/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java index ba4bd67522..ff95869943 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/FutureResponse.java @@ -29,25 +29,51 @@ public class FutureResponse { private static final Logger LOG = LoggerFactory.getLogger(FutureResponse.class); private final ResponseCallback responseCallback; + private final TransportFilter transportFilter; + private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1); public FutureResponse(ResponseCallback responseCallback) { + this(responseCallback, null); + } + + public FutureResponse(ResponseCallback responseCallback, TransportFilter transportFilter) { this.responseCallback = responseCallback; + this.transportFilter = transportFilter; } public Response getResult() throws IOException { + boolean hasInterruptPending = Thread.interrupted(); try { return responseSlot.take(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Operation interupted: " + e, e); + hasInterruptPending = false; + throw dealWithInterrupt(e); + } finally { + if (hasInterruptPending) { + Thread.currentThread().interrupt(); } - throw new InterruptedIOException("Interrupted."); } } + private InterruptedIOException dealWithInterrupt(InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Operation interrupted: " + e, e); + } + InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage()); + interruptedIOException.initCause(e); + try { + if (transportFilter != null) { + transportFilter.onException(interruptedIOException); + } + } finally { + Thread.currentThread().interrupt(); + } + return interruptedIOException; + } + public Response getResult(int timeout) throws IOException { + final boolean wasInterrupted = Thread.interrupted(); try { Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS); if (result == null && timeout > 0) { @@ -55,7 +81,11 @@ public class FutureResponse { } return result; } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted."); + throw dealWithInterrupt(e); + } finally { + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java b/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java index eca76a7c01..ad18ea6142 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java @@ -64,7 +64,7 @@ public class ResponseCorrelator extends TransportFilter { Command command = (Command) o; command.setCommandId(sequenceGenerator.getNextSequenceId()); command.setResponseRequired(true); - FutureResponse future = new FutureResponse(responseCallback); + FutureResponse future = new FutureResponse(responseCallback, this); IOException priorError = null; synchronized (requestMap) { priorError = this.error; @@ -122,7 +122,7 @@ public class ResponseCorrelator extends TransportFilter { * any of current requests. Lets let them know of the problem. */ public void onException(IOException error) { - dispose(error); + dispose(new TransportDisposedIOException("Disposed due to prior exception", error)); super.onException(error); } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java index 632fc05bb8..1d577775b6 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportDisposedIOException.java @@ -38,4 +38,8 @@ public class TransportDisposedIOException extends IOException { super(message); } + public TransportDisposedIOException(String message, Throwable cause) { + super(message, cause); + } + } diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java b/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java index c86c6ed46b..fe3b179c1e 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java @@ -93,13 +93,25 @@ public class WireFormatNegotiator extends TransportFilter { } public void oneway(Object command) throws IOException { + boolean wasInterrupted = Thread.interrupted(); try { - if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) { + if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) { throw new IOException("Wire format negotiation timeout: peer did not send his wire format."); } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(); + InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation"); + interruptedIOException.initCause(e); + try { + onException(interruptedIOException); + } finally { + Thread.currentThread().interrupt(); + wasInterrupted = false; + } + throw interruptedIOException; + } finally { + if (wasInterrupted) { + Thread.currentThread().interrupt(); + } } super.oneway(command); } @@ -143,6 +155,7 @@ public class WireFormatNegotiator extends TransportFilter { } catch (IOException e) { onException(e); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); onException((IOException)new InterruptedIOException().initCause(e)); } catch (Exception e) { onException(IOExceptionSupport.create(e)); diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 0f36d67ee0..7f7d7c6da9 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -130,7 +130,7 @@ public class FailoverTransport implements CompositeTransport { private String nestedExtraQueryOptions; private boolean shuttingDown = false; - public FailoverTransport() throws InterruptedIOException { + public FailoverTransport() { brokerSslContext = SslContext.getCurrentSslContext(); stateTracker.setTrackTransactions(true); // Setup a task that is used to reconnect the a connection async. diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java index 0d933e5e22..00ae7ae453 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java @@ -157,7 +157,7 @@ public class FanoutTransport implements CompositeTransport { } } - public FanoutTransport() throws InterruptedIOException { + public FanoutTransport() { // Setup a task that is used to reconnect the a connection async. reconnectTaskFactory = new TaskRunnerFactory(); reconnectTaskFactory.init(); diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java index 1a3dc340de..27b69fc37d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ThreadPoolUtils.java @@ -124,7 +124,11 @@ public final class ThreadPoolUtils { warned = true; LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); // we were interrupted during shutdown, so force shutdown - executorService.shutdownNow(); + try { + executorService.shutdownNow(); + } finally { + Thread.currentThread().interrupt(); + } } } diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java index 4715d022bb..c65dbb9980 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java @@ -184,6 +184,7 @@ public class HttpClientTransport extends HttpTransportSupport { Thread.sleep(1000); } catch (InterruptedException e) { onException(new InterruptedIOException()); + Thread.currentThread().interrupt(); break; } } else { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java new file mode 100644 index 0000000000..a297121d46 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ActiveMQXAConnectionTxInterruptTest.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; +import javax.jms.XASession; +import javax.transaction.xa.XAException; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.MutableBrokerFilter; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.transaction.Synchronization; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.*; + +public class ActiveMQXAConnectionTxInterruptTest { + private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionTxInterruptTest.class); + long txGenerator = System.currentTimeMillis(); + private BrokerService broker; + XASession session; + XAResource resource; + ActiveMQXAConnection xaConnection; + Destination dest; + + @Before + public void startBrokerEtc() throws Exception { + broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/BRXA")); + broker.setPersistent(false); + broker.start(); + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")"); + cf1.setStatsEnabled(true); + xaConnection = (ActiveMQXAConnection)cf1.createConnection(); + xaConnection.start(); + session = xaConnection.createXASession(); + resource = session.getXAResource(); + + dest = new ActiveMQQueue("Q"); + + } + + @After + public void tearDown() throws Exception { + try { + xaConnection.close(); + } catch (Throwable ignore) { + } + try { + broker.stop(); + } catch (Throwable ignore) { + } + } + + + @Test + public void testRollbackAckInterrupted() throws Exception { + + // publish a message + publishAMessage(); + Xid tid; + + // consume in tx and rollback with interrupt + session = xaConnection.createXASession(); + final MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + ((TransactionContext)resource).addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source")); + Thread.currentThread().interrupt(); + } + }); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMFAIL); + resource.rollback(tid); + session.close(); + assertTrue("Was interrupted", Thread.currentThread().isInterrupted()); + } + + @Test + public void testCommitAckInterrupted() throws Exception { + + // publish a message + publishAMessage(); + + // consume in tx and rollback with interrupt + session = xaConnection.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + Xid tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + ((TransactionContext)resource).addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source")); + Thread.currentThread().interrupt(); + } + }); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + } + + @Test + public void testInterruptWhilePendingResponseToAck() throws Exception { + + final LinkedList errors = new LinkedList(); + final CountDownLatch blockedServerSize = new CountDownLatch(1); + final CountDownLatch canContinue = new CountDownLatch(1); + MutableBrokerFilter filter = (MutableBrokerFilter)broker.getBroker().getAdaptor(MutableBrokerFilter.class); + filter.setNext(new MutableBrokerFilter(filter.getNext()) { + @Override + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { + blockedServerSize.countDown(); + canContinue.await(); + super.acknowledge(consumerExchange, ack); + } + }); + + publishAMessage(); + + // consume in tx and rollback with interrupt while pending reply + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + session = xaConnection.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + Xid tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + + try { + resource.end(tid, XAResource.TMSUCCESS); + fail("Expect end to fail"); + } catch (Throwable expectedWithInterrupt) { + assertTrue(expectedWithInterrupt instanceof XAException); + assertCause(expectedWithInterrupt, new Class[]{InterruptedException.class}); + } + + try { + resource.rollback(tid); + fail("Expect rollback to fail due to connection being closed"); + } catch (Throwable expectedWithInterrupt) { + assertTrue(expectedWithInterrupt instanceof XAException); + assertCause(expectedWithInterrupt, new Class[]{ConnectionClosedException.class, InterruptedException.class}); + } + session.close(); + + assertTrue("Was interrupted", Thread.currentThread().isInterrupted()); + + } catch (Throwable error) { + error.printStackTrace(); + errors.add(error); + } + } + }); + + assertTrue("got to blocking call", blockedServerSize.await(20, TimeUnit.SECONDS)); + + // will interrupt + executorService.shutdownNow(); + canContinue.countDown(); + + assertTrue("job done", executorService.awaitTermination(20, TimeUnit.SECONDS)); + + assertTrue("no errors: " + errors, errors.isEmpty()); + } + + private void assertCause(Throwable expectedWithInterrupt, Class[] exceptionClazzes) { + Throwable candidate = expectedWithInterrupt; + + while (candidate != null) { + for (Class exceptionClazz: exceptionClazzes) { + if (exceptionClazz.isInstance(candidate)) { + return; + } + } + candidate = candidate.getCause(); + } + LOG.error("ex", expectedWithInterrupt); + fail("no expected type as cause:" + expectedWithInterrupt); + } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 87; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + + } + + private void publishAMessage() throws IOException, XAException, JMSException { + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + } + + + private String getName() { + return this.getClass().getName(); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java new file mode 100644 index 0000000000..030f2b4822 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529v2Test.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.activemq.bugs; + +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class AMQ3529v2Test { + + private static Logger LOG = LoggerFactory.getLogger(AMQ3529v2Test.class); + + private BrokerService broker; + private String connectionUri; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test(timeout = 60000) + public void testRandomInterruptionAffects() throws Exception { + doTestRandomInterruptionAffects(); + } + + @Test(timeout = 60000) + public void testRandomInterruptionAffectsWithFailover() throws Exception { + connectionUri = "failover:(" + connectionUri + ")"; + doTestRandomInterruptionAffects(); + } + + public void doTestRandomInterruptionAffects() throws Exception { + final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + + ThreadGroup tg = new ThreadGroup("tg"); + + assertEquals(0, tg.activeCount()); + + class ClientThread extends Thread { + + public Exception error; + + public ClientThread(ThreadGroup tg, String name) { + super(tg, name); + } + + @Override + public void run() { + Context ctx = null; + Connection connection = null; + Session session = null; + MessageConsumer consumer = null; + + try { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + Properties props = new Properties(); + props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + props.setProperty(Context.PROVIDER_URL, connectionUri); + ctx = null; + try { + ctx = new InitialContext(props); + } catch (NoClassDefFoundError e) { + throw new NamingException(e.toString()); + } catch (Exception e) { + throw new NamingException(e.toString()); + } + Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C"); + consumer = session.createConsumer(destination); + consumer.receive(10000); + } catch (Exception e) { + // Expect an exception here from the interrupt. + } finally { + try { + if (consumer != null) { + consumer.close(); + } + } catch (JMSException e) { + trackException("Consumer Close failed with", e); + } + try { + if (session != null) { + session.close(); + } + } catch (JMSException e) { + trackException("Session Close failed with", e); + } + try { + if (connection != null) { + connection.close(); + } + } catch (JMSException e) { + trackException("Connection Close failed with", e); + } + try { + if (ctx != null) { + ctx.close(); + } + } catch (Exception e) { + trackException("Connection Close failed with", e); + } + } + } + + private void trackException(String s, Exception e) { + LOG.error(s, e); + this.error = e; + } + } + + final Random random = new Random(); + List threads = new LinkedList(); + for (int i=0;i<10;i++) { + threads.add(new ClientThread(tg, "Client-"+ i)); + } + for (Thread thread : threads) { + thread.start(); + } + // interrupt the threads at some random time + ExecutorService doTheInterrupts = Executors.newFixedThreadPool(threads.size()); + for (final Thread thread : threads) { + doTheInterrupts.execute(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(random.nextInt(5000)); + } catch (InterruptedException ignored) { + ignored.printStackTrace(); + } + thread.interrupt(); + } + }); + } + doTheInterrupts.shutdown(); + assertTrue("all interrupts done", doTheInterrupts.awaitTermination(30, TimeUnit.SECONDS)); + + for (Thread thread : threads) { + thread.join(); + } + + for (ClientThread thread : threads) { + if (thread.error != null) { + LOG.info("Close error on thread: " + thread, thread.error); + } + } + + Thread[] remainThreads = new Thread[tg.activeCount()]; + tg.enumerate(remainThreads); + for (final Thread t : remainThreads) { + if (t != null && t.isAlive() && !t.isDaemon()) + assertTrue("Thread completes:" + t, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Remaining thread: " + t.toString()); + return !t.isAlive(); + } + })); + } + + ThreadGroup root = Thread.currentThread().getThreadGroup().getParent(); + while (root.getParent() != null) { + root = root.getParent(); + } + visit(root, 0); + } + + // This method recursively visits all thread groups under `group'. + public static void visit(ThreadGroup group, int level) { + // Get threads in `group' + int numThreads = group.activeCount(); + Thread[] threads = new Thread[numThreads * 2]; + numThreads = group.enumerate(threads, false); + + // Enumerate each thread in `group' + for (int i = 0; i < numThreads; i++) { + // Get thread + Thread thread = threads[i]; + LOG.debug("Thread:" + thread.getName() + " is still running"); + } + + // Get thread subgroups of `group' + int numGroups = group.activeGroupCount(); + ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; + numGroups = group.enumerate(groups, false); + + // Recursively visit each subgroup + for (int i = 0; i < numGroups; i++) { + visit(groups[i], level + 1); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java index c5c4706ca5..eccbf1bf10 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java @@ -318,7 +318,7 @@ public class VMTransportThreadSafeTest { // simulate broker stop remote.stop(); - assertTrue(Wait.waitFor(new Wait.Condition() { + assertTrue("got expected exception response", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { LOG.info("answer: " + answer[0]);