ARTEMIS-426 - java.lang.IllegalStateException: AMQ119116: Netty Acceptor unavailable

also related to https://issues.apache.org/jira/browse/ARTEMIS-416

https://issues.apache.org/jira/browse/ARTEMIS-426
This commit is contained in:
Andy Taylor 2016-03-01 14:16:34 +00:00
parent e40a5ead42
commit a3962d6d26
4 changed files with 28 additions and 5 deletions

View File

@ -516,6 +516,11 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// this is just a debug, since an interrupt is an expected event (in case of a shutdown) // this is just a debug, since an interrupt is an expected event (in case of a shutdown)
ActiveMQClientLogger.LOGGER.debug(e1.getMessage(), e1); ActiveMQClientLogger.LOGGER.debug(e1.getMessage(), e1);
} }
catch (Throwable t) {
//for anything else just close so clients are un blocked
close();
throw t;
}
} }
/** /**
@ -902,18 +907,19 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
//we check if we can actually connect. //we check if we can actually connect.
// we do it here as to receive the reply connection has to be not null // we do it here as to receive the reply connection has to be not null
//make sure to reset this.connection == null
if (connection != null && liveNodeID != null) { if (connection != null && liveNodeID != null) {
try { try {
if (!clientProtocolManager.checkForFailover(liveNodeID)) { if (!clientProtocolManager.checkForFailover(liveNodeID)) {
connection.destroy(); connection.destroy();
connection = null; this.connection = null;
return null;
} }
} }
catch (ActiveMQException e) { catch (ActiveMQException e) {
if (connection != null) {
connection.destroy(); connection.destroy();
connection = null; this.connection = null;
} return null;
} }
} }

View File

@ -665,6 +665,7 @@ public class NettyConnector extends AbstractConnector {
ch.writeAndFlush(request); ch.writeAndFlush(request);
if (!httpUpgradeHandler.awaitHandshake()) { if (!httpUpgradeHandler.awaitHandshake()) {
ch.close().awaitUninterruptibly();
return null; return null;
} }
} }

View File

@ -76,6 +76,11 @@ public interface RemotingService {
*/ */
void pauseAcceptors(); void pauseAcceptors();
/**
* Pauses the acceptors so that no more connections can be made to the server
*/
boolean isPaused();
/** /**
* Freezes and then disconnects all connections except the given one and tells the client where else * Freezes and then disconnects all connections except the given one and tells the client where else
* it might connect (only applicable if server is in a cluster and uses scaleDown-on-failover=true). * it might connect (only applicable if server is in a cluster and uses scaleDown-on-failover=true).

View File

@ -113,6 +113,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
private ServiceRegistry serviceRegistry; private ServiceRegistry serviceRegistry;
private boolean paused = false;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
@ -192,6 +194,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
return; return;
} }
paused = false;
// The remoting service maintains it's own thread pool for handling remoting traffic // The remoting service maintains it's own thread pool for handling remoting traffic
// If OIO each connection will have it's own thread // If OIO each connection will have it's own thread
// If NIO these are capped at nio-remoting-threads which defaults to num cores * 3 // If NIO these are capped at nio-remoting-threads which defaults to num cores * 3
@ -316,6 +320,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
if (!started) if (!started)
return; return;
paused = true;
for (Acceptor acceptor : acceptors.values()) { for (Acceptor acceptor : acceptors.values()) {
try { try {
acceptor.pause(); acceptor.pause();
@ -326,6 +332,11 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
} }
} }
@Override
public synchronized boolean isPaused() {
return paused;
}
@Override @Override
public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) { public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) {
if (!started) if (!started)