https://issues.apache.org/jira/browse/AMQ-2191 https://issues.apache.org/jira/browse/AMQ-3529 - rework fixes to remove uncertanty from dealing with intettuptedexception. Sync requests will trap interrupts that ocurr while waiting for responses and fail the connection with an interruptedioexception. Interrupts pending before requests will be suppressed, allowing possible clean shutdown. It is not safe to replay openwire ops b/c they are not idempotent, the only safe option is to have a teardown of the broker side state from a close

This commit is contained in:
gtully 2015-11-27 12:20:12 +00:00
parent 95f58fa7c4
commit 0a12bcb928
16 changed files with 616 additions and 82 deletions

View File

@ -122,6 +122,7 @@ public class VMTransport implements Transport, Task {
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.initCause(e);
throw iioe;

View File

@ -629,12 +629,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
*/
@Override
public void close() throws JMSException {
// Store the interrupted state and clear so that cleanup happens without
// leaking connection resources. Reset in finally to preserve state.
boolean interrupted = Thread.interrupted();
try {
// If we were running, lets stop first.
if (!closed.get() && !transportFailed.get()) {
// do not fail if already closed as according to JMS spec we must not
@ -722,9 +717,6 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ServiceSupport.dispose(this.transport);
factoryStats.removeConnection(this);
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -726,17 +726,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
void doClose() throws JMSException {
// Store interrupted state and clear so that Transport operations don't
// throw InterruptedException and we ensure that resources are cleaned up.
boolean interrupted = Thread.interrupted();
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
this.session.asyncSendPacket(removeCommand);
if (interrupted) {
Thread.currentThread().interrupt();
}
}
void inProgressClearRequired() {

View File

@ -660,14 +660,10 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
private void doClose() throws JMSException {
boolean interrupted = Thread.interrupted();
dispose();
RemoveInfo removeCommand = info.createRemoveCommand();
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
connection.asyncSendPacket(removeCommand);
if (interrupted) {
Thread.currentThread().interrupt();
}
}
final AtomicInteger clearRequestsCounter = new AtomicInteger(0);

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -29,13 +28,11 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
@ -330,7 +327,7 @@ public class TransactionContext implements XAResource {
this.transactionId = null;
// Notify the listener that the tx was committed back
try {
syncSendPacketWithInterruptionHandling(info);
this.connection.syncSendPacket(info);
if (localTransactionEventListener != null) {
localTransactionEventListener.commitEvent();
}
@ -403,32 +400,36 @@ public class TransactionContext implements XAResource {
if (!equals(associatedXid, xid)) {
throw new XAException(XAException.XAER_PROTO);
}
// TODO: we may want to put the xid in a suspended list.
try {
beforeEnd();
} catch (JMSException e) {
throw toXAException(e);
} finally {
setXid(null);
}
invokeBeforeEnd();
} else if ((flags & TMSUCCESS) == TMSUCCESS) {
// set to null if this is the current xid.
// otherwise this could be an asynchronous success call
if (equals(associatedXid, xid)) {
try {
beforeEnd();
} catch (JMSException e) {
throw toXAException(e);
} finally {
setXid(null);
}
invokeBeforeEnd();
}
} else {
throw new XAException(XAException.XAER_INVAL);
}
}
private void invokeBeforeEnd() throws XAException {
boolean throwingException = false;
try {
beforeEnd();
} catch (JMSException e) {
throwingException = true;
throw toXAException(e);
} finally {
try {
setXid(null);
} catch (XAException ignoreIfWillMask){
if (!throwingException) {
throw ignoreIfWillMask;
}
}
}
}
private boolean equals(Xid xid1, Xid xid2) {
if (xid1 == xid2) {
return true;
@ -465,7 +466,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
// Find out if the server wants to commit or rollback.
IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
if (XAResource.XA_RDONLY == response.getResult()) {
// transaction stops now, may be syncs that need a callback
List<TransactionContext> l;
@ -534,7 +535,7 @@ public class TransactionContext implements XAResource {
// Let the server know that the tx is rollback.
TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
syncSendPacketWithInterruptionHandling(info);
this.connection.syncSendPacket(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
@ -581,7 +582,7 @@ public class TransactionContext implements XAResource {
// Notify the server that the tx was committed back
TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
syncSendPacketWithInterruptionHandling(info);
this.connection.syncSendPacket(info);
List<TransactionContext> l;
synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
@ -643,7 +644,7 @@ public class TransactionContext implements XAResource {
try {
// Tell the server to forget the transaction.
syncSendPacketWithInterruptionHandling(info);
this.connection.syncSendPacket(info);
} catch (JMSException e) {
throw toXAException(e);
}
@ -741,7 +742,7 @@ public class TransactionContext implements XAResource {
if (transactionId != null) {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
try {
syncSendPacketWithInterruptionHandling(info);
this.connection.syncSendPacket(info);
LOG.debug("{} ended XA transaction {}", this, transactionId);
} catch (JMSException e) {
disassociate();
@ -773,31 +774,6 @@ public class TransactionContext implements XAResource {
transactionId = null;
}
/**
* Sends the given command. Also sends the command in case of interruption,
* so that important commands like rollback and commit are never interrupted.
* If interruption occurred, set the interruption state of the current
* after performing the action again.
*
* @return the response
*/
private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
try {
return this.connection.syncSendPacket(command);
} catch (JMSException e) {
if (e.getLinkedException() instanceof InterruptedIOException) {
try {
Thread.interrupted();
return this.connection.syncSendPacket(command);
} finally {
Thread.currentThread().interrupt();
}
}
throw e;
}
}
/**
* Converts a JMSException from the server to an XAException. if the
* JMSException contained a linked XAException that is returned instead.

View File

@ -29,25 +29,51 @@ public class FutureResponse {
private static final Logger LOG = LoggerFactory.getLogger(FutureResponse.class);
private final ResponseCallback responseCallback;
private final TransportFilter transportFilter;
private final ArrayBlockingQueue<Response> responseSlot = new ArrayBlockingQueue<Response>(1);
public FutureResponse(ResponseCallback responseCallback) {
this(responseCallback, null);
}
public FutureResponse(ResponseCallback responseCallback, TransportFilter transportFilter) {
this.responseCallback = responseCallback;
this.transportFilter = transportFilter;
}
public Response getResult() throws IOException {
boolean hasInterruptPending = Thread.interrupted();
try {
return responseSlot.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (LOG.isDebugEnabled()) {
LOG.debug("Operation interupted: " + e, e);
hasInterruptPending = false;
throw dealWithInterrupt(e);
} finally {
if (hasInterruptPending) {
Thread.currentThread().interrupt();
}
throw new InterruptedIOException("Interrupted.");
}
}
private InterruptedIOException dealWithInterrupt(InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Operation interrupted: " + e, e);
}
InterruptedIOException interruptedIOException = new InterruptedIOException(e.getMessage());
interruptedIOException.initCause(e);
try {
if (transportFilter != null) {
transportFilter.onException(interruptedIOException);
}
} finally {
Thread.currentThread().interrupt();
}
return interruptedIOException;
}
public Response getResult(int timeout) throws IOException {
final boolean wasInterrupted = Thread.interrupted();
try {
Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
if (result == null && timeout > 0) {
@ -55,7 +81,11 @@ public class FutureResponse {
}
return result;
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
throw dealWithInterrupt(e);
} finally {
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
}

View File

@ -64,7 +64,7 @@ public class ResponseCorrelator extends TransportFilter {
Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback);
FutureResponse future = new FutureResponse(responseCallback, this);
IOException priorError = null;
synchronized (requestMap) {
priorError = this.error;
@ -122,7 +122,7 @@ public class ResponseCorrelator extends TransportFilter {
* any of current requests. Lets let them know of the problem.
*/
public void onException(IOException error) {
dispose(error);
dispose(new TransportDisposedIOException("Disposed due to prior exception", error));
super.onException(error);
}

View File

@ -38,4 +38,8 @@ public class TransportDisposedIOException extends IOException {
super(message);
}
public TransportDisposedIOException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -93,13 +93,25 @@ public class WireFormatNegotiator extends TransportFilter {
}
public void oneway(Object command) throws IOException {
boolean wasInterrupted = Thread.interrupted();
try {
if (!readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation");
interruptedIOException.initCause(e);
try {
onException(interruptedIOException);
} finally {
Thread.currentThread().interrupt();
wasInterrupted = false;
}
throw interruptedIOException;
} finally {
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
super.oneway(command);
}
@ -143,6 +155,7 @@ public class WireFormatNegotiator extends TransportFilter {
} catch (IOException e) {
onException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
onException((IOException)new InterruptedIOException().initCause(e));
} catch (Exception e) {
onException(IOExceptionSupport.create(e));

View File

@ -130,7 +130,7 @@ public class FailoverTransport implements CompositeTransport {
private String nestedExtraQueryOptions;
private boolean shuttingDown = false;
public FailoverTransport() throws InterruptedIOException {
public FailoverTransport() {
brokerSslContext = SslContext.getCurrentSslContext();
stateTracker.setTrackTransactions(true);
// Setup a task that is used to reconnect the a connection async.

View File

@ -157,7 +157,7 @@ public class FanoutTransport implements CompositeTransport {
}
}
public FanoutTransport() throws InterruptedIOException {
public FanoutTransport() {
// Setup a task that is used to reconnect the a connection async.
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();

View File

@ -124,7 +124,11 @@ public final class ThreadPoolUtils {
warned = true;
LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
// we were interrupted during shutdown, so force shutdown
executorService.shutdownNow();
try {
executorService.shutdownNow();
} finally {
Thread.currentThread().interrupt();
}
}
}

View File

@ -184,6 +184,7 @@ public class HttpClientTransport extends HttpTransportSupport {
Thread.sleep(1000);
} catch (InterruptedException e) {
onException(new InterruptedIOException());
Thread.currentThread().interrupt();
break;
}
} else {

View File

@ -0,0 +1,275 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
public class ActiveMQXAConnectionTxInterruptTest {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQXAConnectionTxInterruptTest.class);
long txGenerator = System.currentTimeMillis();
private BrokerService broker;
XASession session;
XAResource resource;
ActiveMQXAConnection xaConnection;
Destination dest;
@Before
public void startBrokerEtc() throws Exception {
broker = BrokerFactory.createBroker(new URI("broker:(tcp://localhost:0)/BRXA"));
broker.setPersistent(false);
broker.start();
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getConnectUri() + ")");
cf1.setStatsEnabled(true);
xaConnection = (ActiveMQXAConnection)cf1.createConnection();
xaConnection.start();
session = xaConnection.createXASession();
resource = session.getXAResource();
dest = new ActiveMQQueue("Q");
}
@After
public void tearDown() throws Exception {
try {
xaConnection.close();
} catch (Throwable ignore) {
}
try {
broker.stop();
} catch (Throwable ignore) {
}
}
@Test
public void testRollbackAckInterrupted() throws Exception {
// publish a message
publishAMessage();
Xid tid;
// consume in tx and rollback with interrupt
session = xaConnection.createXASession();
final MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
((TransactionContext)resource).addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception {
LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
Thread.currentThread().interrupt();
}
});
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMFAIL);
resource.rollback(tid);
session.close();
assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
}
@Test
public void testCommitAckInterrupted() throws Exception {
// publish a message
publishAMessage();
// consume in tx and rollback with interrupt
session = xaConnection.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
Xid tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
((TransactionContext)resource).addSynchronization(new Synchronization() {
@Override
public void beforeEnd() throws Exception {
LOG.info("Interrupting thread: " + Thread.currentThread(), new Throwable("Source"));
Thread.currentThread().interrupt();
}
});
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
}
@Test
public void testInterruptWhilePendingResponseToAck() throws Exception {
final LinkedList<Throwable> errors = new LinkedList<Throwable>();
final CountDownLatch blockedServerSize = new CountDownLatch(1);
final CountDownLatch canContinue = new CountDownLatch(1);
MutableBrokerFilter filter = (MutableBrokerFilter)broker.getBroker().getAdaptor(MutableBrokerFilter.class);
filter.setNext(new MutableBrokerFilter(filter.getNext()) {
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
blockedServerSize.countDown();
canContinue.await();
super.acknowledge(consumerExchange, ack);
}
});
publishAMessage();
// consume in tx and rollback with interrupt while pending reply
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
session = xaConnection.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
Xid tid = createXid();
resource = session.getXAResource();
resource.start(tid, XAResource.TMNOFLAGS);
TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
assertNotNull(receivedMessage);
assertEquals(getName(), receivedMessage.getText());
try {
resource.end(tid, XAResource.TMSUCCESS);
fail("Expect end to fail");
} catch (Throwable expectedWithInterrupt) {
assertTrue(expectedWithInterrupt instanceof XAException);
assertCause(expectedWithInterrupt, new Class[]{InterruptedException.class});
}
try {
resource.rollback(tid);
fail("Expect rollback to fail due to connection being closed");
} catch (Throwable expectedWithInterrupt) {
assertTrue(expectedWithInterrupt instanceof XAException);
assertCause(expectedWithInterrupt, new Class[]{ConnectionClosedException.class, InterruptedException.class});
}
session.close();
assertTrue("Was interrupted", Thread.currentThread().isInterrupted());
} catch (Throwable error) {
error.printStackTrace();
errors.add(error);
}
}
});
assertTrue("got to blocking call", blockedServerSize.await(20, TimeUnit.SECONDS));
// will interrupt
executorService.shutdownNow();
canContinue.countDown();
assertTrue("job done", executorService.awaitTermination(20, TimeUnit.SECONDS));
assertTrue("no errors: " + errors, errors.isEmpty());
}
private void assertCause(Throwable expectedWithInterrupt, Class[] exceptionClazzes) {
Throwable candidate = expectedWithInterrupt;
while (candidate != null) {
for (Class<?> exceptionClazz: exceptionClazzes) {
if (exceptionClazz.isInstance(candidate)) {
return;
}
}
candidate = candidate.getCause();
}
LOG.error("ex", expectedWithInterrupt);
fail("no expected type as cause:" + expectedWithInterrupt);
}
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);
os.close();
final byte[] bs = baos.toByteArray();
return new Xid() {
public int getFormatId() {
return 87;
}
public byte[] getGlobalTransactionId() {
return bs;
}
public byte[] getBranchQualifier() {
return bs;
}
};
}
private void publishAMessage() throws IOException, XAException, JMSException {
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
MessageProducer producer = session.createProducer(dest);
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(getName());
producer.send(message);
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
}
private String getName() {
return this.getClass().getName();
}
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.activemq.bugs;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class AMQ3529v2Test {
private static Logger LOG = LoggerFactory.getLogger(AMQ3529v2Test.class);
private BrokerService broker;
private String connectionUri;
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://0.0.0.0:0");
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
@Test(timeout = 60000)
public void testRandomInterruptionAffects() throws Exception {
doTestRandomInterruptionAffects();
}
@Test(timeout = 60000)
public void testRandomInterruptionAffectsWithFailover() throws Exception {
connectionUri = "failover:(" + connectionUri + ")";
doTestRandomInterruptionAffects();
}
public void doTestRandomInterruptionAffects() throws Exception {
final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
ThreadGroup tg = new ThreadGroup("tg");
assertEquals(0, tg.activeCount());
class ClientThread extends Thread {
public Exception error;
public ClientThread(ThreadGroup tg, String name) {
super(tg, name);
}
@Override
public void run() {
Context ctx = null;
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.setProperty(Context.PROVIDER_URL, connectionUri);
ctx = null;
try {
ctx = new InitialContext(props);
} catch (NoClassDefFoundError e) {
throw new NamingException(e.toString());
} catch (Exception e) {
throw new NamingException(e.toString());
}
Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C");
consumer = session.createConsumer(destination);
consumer.receive(10000);
} catch (Exception e) {
// Expect an exception here from the interrupt.
} finally {
try {
if (consumer != null) {
consumer.close();
}
} catch (JMSException e) {
trackException("Consumer Close failed with", e);
}
try {
if (session != null) {
session.close();
}
} catch (JMSException e) {
trackException("Session Close failed with", e);
}
try {
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
trackException("Connection Close failed with", e);
}
try {
if (ctx != null) {
ctx.close();
}
} catch (Exception e) {
trackException("Connection Close failed with", e);
}
}
}
private void trackException(String s, Exception e) {
LOG.error(s, e);
this.error = e;
}
}
final Random random = new Random();
List<ClientThread> threads = new LinkedList<ClientThread>();
for (int i=0;i<10;i++) {
threads.add(new ClientThread(tg, "Client-"+ i));
}
for (Thread thread : threads) {
thread.start();
}
// interrupt the threads at some random time
ExecutorService doTheInterrupts = Executors.newFixedThreadPool(threads.size());
for (final Thread thread : threads) {
doTheInterrupts.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException ignored) {
ignored.printStackTrace();
}
thread.interrupt();
}
});
}
doTheInterrupts.shutdown();
assertTrue("all interrupts done", doTheInterrupts.awaitTermination(30, TimeUnit.SECONDS));
for (Thread thread : threads) {
thread.join();
}
for (ClientThread thread : threads) {
if (thread.error != null) {
LOG.info("Close error on thread: " + thread, thread.error);
}
}
Thread[] remainThreads = new Thread[tg.activeCount()];
tg.enumerate(remainThreads);
for (final Thread t : remainThreads) {
if (t != null && t.isAlive() && !t.isDaemon())
assertTrue("Thread completes:" + t, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Remaining thread: " + t.toString());
return !t.isAlive();
}
}));
}
ThreadGroup root = Thread.currentThread().getThreadGroup().getParent();
while (root.getParent() != null) {
root = root.getParent();
}
visit(root, 0);
}
// This method recursively visits all thread groups under `group'.
public static void visit(ThreadGroup group, int level) {
// Get threads in `group'
int numThreads = group.activeCount();
Thread[] threads = new Thread[numThreads * 2];
numThreads = group.enumerate(threads, false);
// Enumerate each thread in `group'
for (int i = 0; i < numThreads; i++) {
// Get thread
Thread thread = threads[i];
LOG.debug("Thread:" + thread.getName() + " is still running");
}
// Get thread subgroups of `group'
int numGroups = group.activeGroupCount();
ThreadGroup[] groups = new ThreadGroup[numGroups * 2];
numGroups = group.enumerate(groups, false);
// Recursively visit each subgroup
for (int i = 0; i < numGroups; i++) {
visit(groups[i], level + 1);
}
}
}

View File

@ -318,7 +318,7 @@ public class VMTransportThreadSafeTest {
// simulate broker stop
remote.stop();
assertTrue(Wait.waitFor(new Wait.Condition() {
assertTrue("got expected exception response", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("answer: " + answer[0]);