mirror of
https://github.com/apache/activemq.git
synced 2025-03-01 05:39:09 +00:00
rework https://issues.apache.org/jira/browse/AMQ-3684 and https://issues.apache.org/jira/browse/AMQ-4532 to avoid intermittent hangs, processing shutdown wile shutdown is in progress - AMQ1936Test and AMQ2021Test - using just TransportDisposedIOException to propagate exception response and start shutdown process and ignoring broker side for logging
This commit is contained in:
parent
c6837acefe
commit
8cf98a070f
@ -239,11 +239,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
|||||||
}
|
}
|
||||||
if (!stopping.get() && !pendingStop) {
|
if (!stopping.get() && !pendingStop) {
|
||||||
transportException.set(e);
|
transportException.set(e);
|
||||||
|
if (! (e instanceof TransportDisposedIOException)) {
|
||||||
if (TRANSPORTLOG.isDebugEnabled()) {
|
if (TRANSPORTLOG.isDebugEnabled()) {
|
||||||
TRANSPORTLOG.debug(this + " failed: " + e, e);
|
TRANSPORTLOG.debug(this + " failed: " + e, e);
|
||||||
} else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
|
} else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
|
||||||
TRANSPORTLOG.warn(this + " failed: " + e);
|
TRANSPORTLOG.warn(this + " failed: " + e);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
stopAsync();
|
stopAsync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -194,15 +194,7 @@ public class VMTransport implements Transport, Task {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (peer.transportListener != null) {
|
if (peer.transportListener != null) {
|
||||||
// let the peer know that we are disconnecting after attempting
|
// let any requests pending a response see an exception and shutdown
|
||||||
// to cleanly shutdown the async tasks so that this is the last
|
|
||||||
// command it see's.
|
|
||||||
try {
|
|
||||||
peer.transportListener.onCommand(new ShutdownInfo());
|
|
||||||
} catch (Exception ignore) {
|
|
||||||
}
|
|
||||||
|
|
||||||
// let any requests pending a response see an exception
|
|
||||||
try {
|
try {
|
||||||
peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
|
peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
|
||||||
} catch (Exception ignore) {
|
} catch (Exception ignore) {
|
||||||
|
@ -173,7 +173,7 @@ public abstract class TestSupport extends CombinationTestSupport {
|
|||||||
regionBroker.getTopicRegion().getDestinationMap();
|
regionBroker.getTopicRegion().getDestinationMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM };
|
public static enum PersistenceAdapterChoice {LevelDB, KahaDB, JDBC, MEM };
|
||||||
|
|
||||||
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
|
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
|
||||||
return setPersistenceAdapter(broker, defaultPersistenceAdapter);
|
return setPersistenceAdapter(broker, defaultPersistenceAdapter);
|
||||||
|
@ -75,7 +75,7 @@ public class AMQ2902Test extends TestCase {
|
|||||||
|
|
||||||
public void testNoExceptionOnClose() throws JMSException {
|
public void testNoExceptionOnClose() throws JMSException {
|
||||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
|
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
|
||||||
"vm://localhost?broker.persistent=false");
|
"vm://localhostTwo?broker.persistent=false");
|
||||||
Connection connection = connectionFactory.createConnection();
|
Connection connection = connectionFactory.createConnection();
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
@ -262,31 +262,6 @@ public class VMTransportThreadSafeTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
|
||||||
public void testStopSendsShutdownToPeer() throws Exception {
|
|
||||||
|
|
||||||
final VMTransport local = new VMTransport(new URI(location1));
|
|
||||||
final VMTransport remote = new VMTransport(new URI(location2));
|
|
||||||
|
|
||||||
local.setPeer(remote);
|
|
||||||
remote.setPeer(local);
|
|
||||||
|
|
||||||
final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
|
|
||||||
|
|
||||||
local.setTransportListener(new VMTestTransportListener(localReceived));
|
|
||||||
remote.setTransportListener(remoteListener);
|
|
||||||
|
|
||||||
local.start();
|
|
||||||
local.stop();
|
|
||||||
|
|
||||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
|
||||||
@Override
|
|
||||||
public boolean isSatisified() throws Exception {
|
|
||||||
return remoteListener.shutdownReceived;
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
|
public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user