https://issues.apache.org/jira/browse/AMQ-4532 - fix and test - ensure disposed exception is propagated to clients on vm server shutdown

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1482117 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-05-13 21:43:39 +00:00
parent ce67624cd4
commit 58b9a83fe8
3 changed files with 52 additions and 1 deletions

View File

@ -200,6 +200,12 @@ public class VMTransport implements Transport, Task {
} catch (Exception ignore) {
}
// let any requests pending a response see an exception
try {
peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
} catch (Exception ignore) {
}
// shutdown task runner factory
if (taskRunnerFactory != null) {
taskRunnerFactory.shutdownNow();

View File

@ -29,10 +29,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
@ -283,6 +287,47 @@ public class VMTransportThreadSafeTest {
}));
}
@Test(timeout=60000)
public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));
final VMTransport remote = new VMTransport(new URI(location2));
local.setPeer(remote);
remote.setPeer(local);
final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
remote.setTransportListener(remoteListener);
final Response[] answer = new Response[1];
ResponseCorrelator responseCorrelator = new ResponseCorrelator(local);
responseCorrelator.setTransportListener(new VMTestTransportListener(localReceived));
responseCorrelator.start();
responseCorrelator.asyncRequest(new DummyCommand(), new ResponseCallback() {
@Override
public void onCompletion(FutureResponse resp) {
try {
answer[0] = resp.getResult();
} catch (IOException e) {
e.printStackTrace();
}
}
});
// simulate broker stop
remote.stop();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("answer: " + answer[0]);
return answer[0] instanceof ExceptionResponse && ((ExceptionResponse)answer[0]).getException() instanceof TransportDisposedIOException;
}
}));
local.stop();
}
@Test(timeout=60000)
public void testMultipleStartsAndStops() throws Exception {

View File

@ -109,7 +109,7 @@ public class VmTransportNetworkBrokerTest extends TestCase {
originalThreadCount +
" threadCountAfterStop=" +
threadCountAfterStop,
threadCountAfterStop == originalThreadCount);
threadCountAfterStop <= originalThreadCount);
}