Add a connect check in the inactivity monitor to account for opened
connections that might drop but not be spotted, in the case where the
connect frame is lost this can lead to connections that aren't fully
opened and won't be cleaned up until the broker detects the socket has
failed.

By default the connection timer is set to 30 seconds, if no connect
frame is read by then the connection is dropped.  The broker can be
configured via the 'transport.connectAttemptTimeout' URI option, a value
This commit is contained in:
Timothy Bish 2015-02-13 17:07:33 -05:00
parent 1cab713864
commit 8b36701fc3
11 changed files with 410 additions and 18 deletions

View File

@ -66,7 +66,10 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
}
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.thread.SchedulerTimerTask;
import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpInactivityMonitor extends TransportFilter {
private static final Logger LOG = LoggerFactory.getLogger(AmqpInactivityMonitor.class);
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static Timer ACTIVITY_CHECK_TIMER;
private final AtomicBoolean failed = new AtomicBoolean(false);
private IAmqpProtocolConverter protocolConverter;
private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
private SchedulerTimerTask connectCheckerTask;
private final Runnable connectChecker = new Runnable() {
private final long startTime = System.currentTimeMillis();
@Override
public void run() {
long now = System.currentTimeMillis();
if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No connection attempt made in time for " + AmqpInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + (connectionTimeout) + ") long: "
+ next.getRemoteAddress()));
}
});
}
}
};
public AmqpInactivityMonitor(Transport next, WireFormat wireFormat) {
super(next);
}
@Override
public void start() throws Exception {
next.start();
}
@Override
public void stop() throws Exception {
stopConnectChecker();
next.stop();
}
@Override
public void onException(IOException error) {
if (failed.compareAndSet(false, true)) {
stopConnectChecker();
if (protocolConverter != null) {
protocolConverter.onAMQPException(error);
}
transportListener.onException(error);
}
}
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
}
public IAmqpProtocolConverter getProtocolConverter() {
return protocolConverter;
}
synchronized void startConnectChecker(long connectionTimeout) {
this.connectionTimeout = connectionTimeout;
if (connectionTimeout > 0 && connectCheckerTask == null) {
connectCheckerTask = new SchedulerTimerTask(connectChecker);
long connectionCheckInterval = Math.min(connectionTimeout, 1000);
synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) {
ASYNC_TASKS = createExecutor();
ACTIVITY_CHECK_TIMER = new Timer("AMQP InactivityMonitor State Check", true);
}
CHECKER_COUNTER++;
ACTIVITY_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
}
}
}
synchronized void stopConnectChecker() {
if (connectCheckerTask != null) {
connectCheckerTask.cancel();
connectCheckerTask = null;
synchronized (AbstractInactivityMonitor.class) {
ACTIVITY_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
ACTIVITY_CHECK_TIMER.cancel();
ACTIVITY_CHECK_TIMER = null;
ThreadPoolUtils.shutdown(ASYNC_TASKS);
ASYNC_TASKS = null;
}
}
}
}
private final ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "AmqpInactivityMonitor Async Task: " + runnable);
thread.setDaemon(true);
return thread;
}
};
private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}
}

View File

@ -26,13 +26,9 @@ import javax.net.SocketFactory;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpNioSslTransport extends NIOSSLTransport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpNioSslTransport.class);
private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {

View File

@ -86,13 +86,16 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
}

View File

@ -141,6 +141,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) {
this.amqpTransport = transport;
AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
if (monitor != null) {
monitor.setProtocolConverter(this);
}
this.amqpWireFormat = transport.getWireFormat();
this.brokerService = brokerService;
@ -513,7 +517,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
connectionInfo.setResponseRequired(true);
connectionInfo.setConnectionId(connectionId);
// configureInactivityMonitor(connect.keepAlive());
configureInactivityMonitor();
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
@ -578,6 +583,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
private void configureInactivityMonitor() {
AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
if (monitor == null) {
return;
}
monitor.stopConnectChecker();
}
InboundTransformer inboundTransformer;
protected InboundTransformer getInboundTransformer() {
@ -648,7 +662,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final ActiveMQDestination destination;
private boolean closed;
private final boolean anonymous;
private MessageId lastDispatched;
public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
this.producerId = producerId;

View File

@ -50,4 +50,7 @@ public interface AmqpTransport {
public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
public void setInactivityMonitor(AmqpInactivityMonitor monitor);
public AmqpInactivityMonitor getInactivityMonitor();
}

View File

@ -66,7 +66,10 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
}
@Override
protected boolean isUseInactivityMonitor(Transport transport) {
return false;
protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
filter.setInactivityMonitor(monitor);
return monitor;
}
}

View File

@ -44,6 +44,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
private IAmqpProtocolConverter protocolConverter;
private AmqpWireFormat wireFormat;
private AmqpInactivityMonitor monitor;
private boolean trace;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
@ -57,6 +58,15 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public void start() throws Exception {
if (monitor != null) {
monitor.setProtocolConverter(protocolConverter);
monitor.startConnectChecker(getConnectAttemptTimeout());
}
super.start();
}
@Override
public void oneway(Object o) throws IOException {
try {
@ -184,4 +194,22 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
public void setProducerCredit(int producerCredit) {
protocolConverter.setProducerCredit(producerCredit);
}
@Override
public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
this.monitor = monitor;
}
@Override
public AmqpInactivityMonitor getInactivityMonitor() {
return monitor;
}
public long getConnectAttemptTimeout() {
return wireFormat.getConnectAttemptTimeout();
}
public void setConnectAttemptTimeout(long connectAttemptTimeout) {
wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
}
}

View File

@ -36,11 +36,14 @@ public class AmqpWireFormat implements WireFormat {
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
private static final int SASL_PROTOCOL = 3;
private int version = 1;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
private boolean magicRead = false;
private ResetListener resetListener;
@ -196,4 +199,12 @@ public class AmqpWireFormat implements WireFormat {
public void setAllowNonSaslConnections(boolean allowNonSaslConnections) {
this.allowNonSaslConnections = allowNonSaslConnections;
}
public long getConnectAttemptTimeout() {
return connectAttemptTimeout;
}
public void setConnectAttemptTimeout(long connectAttemptTimeout) {
this.connectAttemptTimeout = connectAttemptTimeout;
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collection;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test that connection attempts that don't send the connect performative
* get cleaned up by the inactivity monitor.
*/
@RunWith(Parameterized.class)
public class AmqpConnectTimeoutTest extends AmqpTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnectTimeoutTest.class);
private Socket connection;
protected boolean useSSL;
protected String connectorScheme;
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"amqp", false},
{"amqp+ssl", true},
{"amqp+nio", false},
{"amqp+nio+ssl", true}
});
}
public AmqpConnectTimeoutTest(String connectorScheme, boolean useSSL) {
this.connectorScheme = connectorScheme;
this.useSSL = useSSL;
}
protected String getConnectorScheme() {
return connectorScheme;
}
protected boolean isUseSSL() {
return useSSL;
}
@Override
protected boolean isUseSslConnector() {
return true;
}
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Override
@After
public void tearDown() throws Exception {
if (connection != null) {
try {
connection.close();
} catch (Throwable e) {}
connection = null;
}
super.tearDown();
}
@Override
public String getAdditionalConfig() {
return "&transport.connectAttemptTimeout=2000";
}
@Test(timeout = 60 * 1000)
public void testInactivityMonitor() throws Exception {
Thread t1 = new Thread() {
@Override
public void run() {
try {
connection = createConnection();
connection.getOutputStream().write('A');
connection.getOutputStream().flush();
} catch (Exception ex) {
LOG.error("unexpected exception on connect/disconnect", ex);
exceptions.add(ex);
}
}
};
t1.start();
assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
}
}));
// and it should be closed due to inactivity
assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 0 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
}
}));
assertTrue("no exceptions", exceptions.isEmpty());
}
protected Socket createConnection() throws IOException {
int port = 0;
switch (connectorScheme) {
case "amqp":
port = this.port;
break;
case "amqp+ssl":
port = this.sslPort;
break;
case "amqp+nio":
port = this.nioPort;
break;
case "amqp+nio+ssl":
port = this.nioPlusSslPort;
break;
default:
throw new IOException("Invalid AMQP connector scheme passed to test.");
}
if (isUseSSL()) {
return SSLSocketFactory.getDefault().createSocket("localhost", port);
} else {
return new Socket("localhost", port);
}
}
}

View File

@ -134,25 +134,25 @@ public class AmqpTestSupport {
}
if (isUseTcpConnector()) {
connector = brokerService.addConnector(
"amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer());
"amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
port = connector.getConnectUri().getPort();
LOG.debug("Using amqp port " + port);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"amqp+ssl://0.0.0.0:" + sslPort + "?transport.transformer=" + getAmqpTransformer());
"amqp+ssl://0.0.0.0:" + sslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
sslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+ssl port " + sslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"amqp+nio://0.0.0.0:" + nioPort + "?transport.transformer=" + getAmqpTransformer());
"amqp+nio://0.0.0.0:" + nioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
nioPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio port " + nioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort + "?transport.transformer=" + getAmqpTransformer());
"amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
nioPlusSslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio+ssl port " + nioPlusSslPort);
}
@ -182,6 +182,10 @@ public class AmqpTestSupport {
return "jms";
}
protected String getAdditionalConfig() {
return "";
}
public void startBroker() throws Exception {
if (brokerService != null) {
throw new IllegalStateException("Broker is already created.");