From 58b9a83fe8225b9661dcf3992d0021167160d14a Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Mon, 13 May 2013 21:43:39 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4532 - fix and test - ensure disposed exception is propagated to clients on vm server shutdown git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1482117 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/vm/VMTransport.java | 6 +++ .../vm/VMTransportThreadSafeTest.java | 45 +++++++++++++++++++ .../vm/VmTransportNetworkBrokerTest.java | 2 +- 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index ff23730e32..ca2bd905fa 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -200,6 +200,12 @@ public class VMTransport implements Transport, Task { } catch (Exception ignore) { } + // let any requests pending a response see an exception + try { + peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped.")); + } catch (Exception ignore) { + } + // shutdown task runner factory if (taskRunnerFactory != null) { taskRunnerFactory.shutdownNow(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java index 4a1a211489..fa3ef3c82d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java @@ -29,10 +29,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.BaseCommand; +import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.Response; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.state.CommandVisitor; +import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.ResponseCallback; +import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportListener; @@ -283,6 +287,47 @@ public class VMTransportThreadSafeTest { })); } + @Test(timeout=60000) + public void testRemoteStopSendsExceptionToPendingRequests() throws Exception { + + final VMTransport local = new VMTransport(new URI(location1)); + final VMTransport remote = new VMTransport(new URI(location2)); + + local.setPeer(remote); + remote.setPeer(local); + + final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived); + remote.setTransportListener(remoteListener); + + final Response[] answer = new Response[1]; + ResponseCorrelator responseCorrelator = new ResponseCorrelator(local); + responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived)); + responseCorrelator.start(); + responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() { + @Override + public void onCompletion(FutureResponse resp) { + try { + answer[0] = resp.getResult(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + + // simulate broker stop + remote.stop(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("answer: " + answer[0]); + return answer[0] instanceof ExceptionResponse && ((ExceptionResponse)answer[0]).getException() instanceof TransportDisposedIOException; + } + })); + + local.stop(); + } + @Test(timeout=60000) public void testMultipleStartsAndStops() throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java index 457cb092f1..e29c078e16 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VmTransportNetworkBrokerTest.java @@ -109,7 +109,7 @@ public class VmTransportNetworkBrokerTest extends TestCase { originalThreadCount + " threadCountAfterStop=" + threadCountAfterStop, - threadCountAfterStop == originalThreadCount); + threadCountAfterStop <= originalThreadCount); }