diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java index 8cf64882f0..8e6f60d8b2 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java @@ -30,7 +30,6 @@ import org.apache.activemq.transport.AbstractInactivityMonitor; import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; -import org.apache.activemq.util.ThreadPoolUtils; import org.apache.activemq.wireformat.WireFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +39,16 @@ public class AmqpInactivityMonitor extends TransportFilter { private static final Logger LOG = LoggerFactory.getLogger(AmqpInactivityMonitor.class); private static ThreadPoolExecutor ASYNC_TASKS; - private static int CHECKER_COUNTER; - private static Timer ACTIVITY_CHECK_TIMER; + private static int CONNECTION_CHECK_TASK_COUNTER; + private static Timer CONNECTION_CHECK_TASK_TIMER; + private static int KEEPALIVE_TASK_COUNTER; + private static Timer KEEPALIVE_TASK_TIMER; private final AtomicBoolean failed = new AtomicBoolean(false); - private AmqpProtocolConverter protocolConverter; + private AmqpTransport amqpTransport; private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT; + private SchedulerTimerTask connectCheckerTask; private final Runnable connectChecker = new Runnable() { @@ -54,18 +56,44 @@ public class AmqpInactivityMonitor extends TransportFilter { @Override public void run() { - long now = System.currentTimeMillis(); if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isTerminating()) { - if (LOG.isDebugEnabled()) { - LOG.debug("No connection attempt made in time for " + AmqpInactivityMonitor.this.toString() + "! Throwing InactivityIOException."); - } + LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AmqpInactivityMonitor.this.toString()); ASYNC_TASKS.execute(new Runnable() { @Override public void run() { - onException(new InactivityIOException("Channel was inactive for too (>" + (connectionTimeout) + ") long: " - + next.getRemoteAddress())); + onException(new InactivityIOException( + "Channel was inactive for too (>" + (connectionTimeout) + ") long: " + next.getRemoteAddress())); + } + }); + } + } + }; + + private SchedulerTimerTask keepAliveTask; + private final Runnable keepAlive = new Runnable() { + + @Override + public void run() { + if (keepAliveTask != null && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) { + ASYNC_TASKS.execute(new Runnable() { + @Override + public void run() { + try { + long nextIdleUpdate = amqpTransport.keepAlive(); + if (nextIdleUpdate > 0) { + synchronized (AmqpInactivityMonitor.this) { + if (keepAliveTask != null) { + keepAliveTask = new SchedulerTimerTask(keepAlive); + KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextIdleUpdate); + } + } + } + } catch (Exception ex) { + onException(new InactivityIOException( + "Exception while performing idle checks for connection: " + next.getRemoteAddress())); + } } }); } @@ -83,30 +111,31 @@ public class AmqpInactivityMonitor extends TransportFilter { @Override public void stop() throws Exception { - stopConnectChecker(); + stopConnectionTimeoutChecker(); + stopKeepAliveTask(); next.stop(); } @Override public void onException(IOException error) { if (failed.compareAndSet(false, true)) { - stopConnectChecker(); - if (protocolConverter != null) { - protocolConverter.onAMQPException(error); + stopConnectionTimeoutChecker(); + if (amqpTransport != null) { + amqpTransport.onException(error); } transportListener.onException(error); } } - public void setProtocolConverter(AmqpProtocolConverter protocolConverter) { - this.protocolConverter = protocolConverter; + public void setAmqpTransport(AmqpTransport amqpTransport) { + this.amqpTransport = amqpTransport; } - public AmqpProtocolConverter getProtocolConverter() { - return protocolConverter; + public AmqpTransport getAmqpTransport() { + return amqpTransport; } - public synchronized void startConnectChecker(long connectionTimeout) { + public synchronized void startConnectionTimeoutChecker(long connectionTimeout) { this.connectionTimeout = connectionTimeout; if (connectionTimeout > 0 && connectCheckerTask == null) { connectCheckerTask = new SchedulerTimerTask(connectChecker); @@ -114,29 +143,68 @@ public class AmqpInactivityMonitor extends TransportFilter { long connectionCheckInterval = Math.min(connectionTimeout, 1000); synchronized (AbstractInactivityMonitor.class) { - if (CHECKER_COUNTER == 0) { - ASYNC_TASKS = createExecutor(); - ACTIVITY_CHECK_TIMER = new Timer("AMQP InactivityMonitor State Check", true); + if (CONNECTION_CHECK_TASK_COUNTER == 0) { + if (ASYNC_TASKS == null) { + ASYNC_TASKS = createExecutor(); + } + CONNECTION_CHECK_TASK_TIMER = new Timer("AMQP InactivityMonitor State Check", true); } - CHECKER_COUNTER++; - ACTIVITY_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval); + CONNECTION_CHECK_TASK_COUNTER++; + CONNECTION_CHECK_TASK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval); } } } - public synchronized void stopConnectChecker() { + /** + * Starts the keep alive task which will run after the given delay. + * + * @param nextKeepAliveCheck + * time in milliseconds to wait before performing the next keep-alive check. + */ + public synchronized void startKeepAliveTask(long nextKeepAliveCheck) { + if (nextKeepAliveCheck > 0 && keepAliveTask == null) { + keepAliveTask = new SchedulerTimerTask(keepAlive); + + synchronized (AbstractInactivityMonitor.class) { + if (KEEPALIVE_TASK_COUNTER == 0) { + if (ASYNC_TASKS == null) { + ASYNC_TASKS = createExecutor(); + } + KEEPALIVE_TASK_TIMER = new Timer("AMQP InactivityMonitor Idle Update", true); + } + KEEPALIVE_TASK_COUNTER++; + KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextKeepAliveCheck); + } + } + } + + public synchronized void stopConnectionTimeoutChecker() { if (connectCheckerTask != null) { connectCheckerTask.cancel(); connectCheckerTask = null; synchronized (AbstractInactivityMonitor.class) { - ACTIVITY_CHECK_TIMER.purge(); - CHECKER_COUNTER--; - if (CHECKER_COUNTER == 0) { - ACTIVITY_CHECK_TIMER.cancel(); - ACTIVITY_CHECK_TIMER = null; - ThreadPoolUtils.shutdown(ASYNC_TASKS); - ASYNC_TASKS = null; + CONNECTION_CHECK_TASK_TIMER.purge(); + CONNECTION_CHECK_TASK_COUNTER--; + if (CONNECTION_CHECK_TASK_COUNTER == 0) { + CONNECTION_CHECK_TASK_TIMER.cancel(); + CONNECTION_CHECK_TASK_TIMER = null; + } + } + } + } + + public synchronized void stopKeepAliveTask() { + if (keepAliveTask != null) { + keepAliveTask.cancel(); + keepAliveTask = null; + + synchronized (AbstractInactivityMonitor.class) { + KEEPALIVE_TASK_TIMER.purge(); + KEEPALIVE_TASK_COUNTER--; + if (KEEPALIVE_TASK_COUNTER == 0) { + KEEPALIVE_TASK_TIMER.cancel(); + KEEPALIVE_TASK_TIMER = null; } } } @@ -152,7 +220,7 @@ public class AmqpInactivityMonitor extends TransportFilter { }; private ThreadPoolExecutor createExecutor() { - ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), factory); + ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 90, TimeUnit.SECONDS, new SynchronousQueue(), factory); exec.allowCoreThreadTimeOut(true); return exec; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 5c33ed1804..6b9a178dd4 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -65,4 +65,15 @@ public interface AmqpProtocolConverter { */ void updateTracer(); + /** + * Perform any keep alive processing for the connection such as sending + * empty frames or closing connections due to remote end being inactive + * for to long. + * + * @returns the amount of milliseconds to wait before performaing another check. + * + * @throws IOException if an error occurs on writing heatbeats to the wire. + */ + long keepAlive() throws IOException; + } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java index 85f44ec09d..9ae478705b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolDiscriminator.java @@ -111,4 +111,9 @@ public class AmqpProtocolDiscriminator implements AmqpProtocolConverter { @Override public void updateTracer() { } + + @Override + public long keepAlive() { + return 0; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java index 1972e18eb8..2ea2d1ba20 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java @@ -54,4 +54,8 @@ public interface AmqpTransport { public AmqpInactivityMonitor getInactivityMonitor(); + public boolean isUseInactivityMonitor(); + + public long keepAlive(); + } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index c65145acba..9ca19b1269 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -59,8 +59,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor @Override public void start() throws Exception { if (monitor != null) { - monitor.setProtocolConverter(protocolConverter); - monitor.startConnectChecker(getConnectAttemptTimeout()); + monitor.setAmqpTransport(this); + monitor.startConnectionTimeoutChecker(getConnectAttemptTimeout()); } super.start(); } @@ -135,6 +135,26 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } } + @Override + public long keepAlive() { + long nextKeepAliveDelay = 0l; + + try { + lock.lock(); + try { + nextKeepAliveDelay = protocolConverter.keepAlive(); + } finally { + lock.unlock(); + } + } catch (IOException e) { + handleException(e); + } catch (Exception e) { + onException(IOExceptionSupport.create(e)); + } + + return nextKeepAliveDelay; + } + @Override public X509Certificate[] getPeerCertificates() { if (next instanceof SslTransport) { @@ -210,11 +230,16 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor return monitor; } - public long getConnectAttemptTimeout() { + @Override + public boolean isUseInactivityMonitor() { + return monitor != null; + } + + public int getConnectAttemptTimeout() { return wireFormat.getConnectAttemptTimeout(); } - public void setConnectAttemptTimeout(long connectAttemptTimeout) { + public void setConnectAttemptTimeout(int connectAttemptTimeout) { wireFormat.setConnectAttemptTimeout(connectAttemptTimeout); } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 570fd2b754..5d261f9c85 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -37,7 +37,8 @@ public class AmqpWireFormat implements WireFormat { public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; public static final int NO_AMQP_MAX_FRAME_SIZE = -1; - public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L; + public static final int DEFAULT_CONNECTION_TIMEOUT = 30000; + public static final int DEFAULT_IDLE_TIMEOUT = 30000; public static final int DEFAULT_PRODUCER_CREDIT = 1000; private static final int SASL_PROTOCOL = 3; @@ -45,7 +46,8 @@ public class AmqpWireFormat implements WireFormat { private int version = 1; private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; - private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT; + private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT; + private int idelTimeout = DEFAULT_IDLE_TIMEOUT; private int producerCredit = DEFAULT_PRODUCER_CREDIT; private String transformer = InboundTransformer.TRANSFORMER_NATIVE; @@ -206,11 +208,11 @@ public class AmqpWireFormat implements WireFormat { this.allowNonSaslConnections = allowNonSaslConnections; } - public long getConnectAttemptTimeout() { + public int getConnectAttemptTimeout() { return connectAttemptTimeout; } - public void setConnectAttemptTimeout(long connectAttemptTimeout) { + public void setConnectAttemptTimeout(int connectAttemptTimeout) { this.connectAttemptTimeout = connectAttemptTimeout; } @@ -229,4 +231,12 @@ public class AmqpWireFormat implements WireFormat { public void setTransformer(String transformer) { this.transformer = transformer; } + + public int getIdleTimeout() { + return idelTimeout; + } + + public void setIdleTimeout(int idelTimeout) { + this.idelTimeout = idelTimeout; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index c0ea6ad2d4..d7439432e8 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -26,6 +26,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.contains; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -55,6 +56,7 @@ import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.ShutdownInfo; +import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.AmqpHeader; import org.apache.activemq.transport.amqp.AmqpInactivityMonitor; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; @@ -74,6 +76,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; @@ -123,7 +126,7 @@ public class AmqpConnection implements AmqpProtocolConverter { AmqpInactivityMonitor monitor = transport.getInactivityMonitor(); if (monitor != null) { - monitor.setProtocolConverter(this); + monitor.setAmqpTransport(amqpTransport); } this.amqpWireFormat = transport.getWireFormat(); @@ -200,6 +203,28 @@ public class AmqpConnection implements AmqpProtocolConverter { } } + @Override + public long keepAlive() throws IOException { + long rescheduleAt = 0l; + + LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress()); + + if (protonConnection.getLocalState() != EndpointState.CLOSED) { + rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); + pumpProtonToSocket(); + if (protonTransport.isClosed()) { + rescheduleAt = 0; + LOG.debug("Transport closed after inactivity check."); + throw new InactivityIOException("Channel was inactive for to long"); + } + } + + LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.", + amqpTransport.getRemoteAddress(), rescheduleAt); + + return rescheduleAt; + } + //----- Connection Properties Accessors ----------------------------------// /** @@ -281,6 +306,11 @@ public class AmqpConnection implements AmqpProtocolConverter { frame = (Buffer) command; } + if (protonTransport.isClosed()) { + LOG.debug("Ignoring incoming AMQP data, transport is closed."); + return; + } + while (frame.length > 0) { try { int count = protonTransport.input(frame.data, frame.offset, frame.length); @@ -357,11 +387,11 @@ public class AmqpConnection implements AmqpProtocolConverter { protected void processConnectionOpen(Connection connection) throws Exception { + stopConnectionTimeoutChecker(); + connectionInfo.setResponseRequired(true); connectionInfo.setConnectionId(connectionId); - configureInactivityMonitor(); - String clientId = protonConnection.getRemoteContainer(); if (clientId != null && !clientId.isEmpty()) { connectionInfo.setClientId(clientId); @@ -369,6 +399,20 @@ public class AmqpConnection implements AmqpProtocolConverter { connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); + if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) { + // We cannot meet the requested Idle processing because the inactivity monitor is + // disabled so we won't send idle frames to match the request. + protonConnection.setProperties(getFailedConnetionProperties()); + protonConnection.open(); + protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames")); + protonConnection.close(); + pumpProtonToSocket(); + + amqpTransport.onException(new IOException( + "Connection failed, remote requested idle processing but inactivity monitoring is disbaled.")); + return; + } + sendToActiveMQ(connectionInfo, new ResponseHandler() { @Override public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { @@ -389,9 +433,17 @@ public class AmqpConnection implements AmqpProtocolConverter { protonConnection.close(); } else { + + if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) { + LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout()); + protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout()); + } + protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); protonConnection.setProperties(getConnetionProperties()); protonConnection.open(); + + configureInactivityMonitor(); } } finally { pumpProtonToSocket(); @@ -678,12 +730,28 @@ public class AmqpConnection implements AmqpProtocolConverter { return new SessionId(connectionId, nextSessionId++); } + private void stopConnectionTimeoutChecker() { + AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); + if (monitor != null) { + monitor.stopConnectionTimeoutChecker(); + } + } + private void configureInactivityMonitor() { AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); if (monitor == null) { return; } - monitor.stopConnectChecker(); + // If either end has idle timeout requirements then the tick method + // will give us a deadline on the next time we need to tick() in order + // to meet those obligations. + long nextIdleCheck = protonTransport.tick(System.currentTimeMillis()); + if (nextIdleCheck > 0) { + LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck)); + monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis()); + } else { + LOG.trace("Connection does not require keep-alive processing"); + } } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index fbc4ddae1c..767f792e77 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -99,24 +99,6 @@ public abstract class AmqpAbstractResource implements AmqpRe this.closeRequest = request; doClose(); } -// // If already closed signal success or else the caller might never get notified. -// if (getEndpoint().getLocalState() == EndpointState.CLOSED || -// getEndpoint().getRemoteState() == EndpointState.CLOSED) { -// -// if (getEndpoint().getLocalState() != EndpointState.CLOSED) { -// // Remote already closed this resource, close locally and free. -// if (getEndpoint().getLocalState() != EndpointState.CLOSED) { -// doClose(); -// getEndpoint().free(); -// } -// } -// -// request.onSuccess(); -// return; -// } -// -// this.closeRequest = request; -// doClose(); } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index eafbd1be8e..171a269af8 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.transport.InactivityIOException; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport; @@ -40,6 +41,7 @@ import org.apache.activemq.util.IdGenerator; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; @@ -77,9 +79,11 @@ public class AmqpConnection extends AmqpAbstractResource implements private List offeredCapabilities = Collections.emptyList(); private Map offeredProperties = Collections.emptyMap(); - private AmqpClientListener listener; + private AmqpConnectionListener listener; private SaslAuthenticator authenticator; + private int idleTimeout = 0; + private boolean idleProcessingDisabled; private String containerId; private boolean authenticated; private int channelMax = DEFAULT_CHANNEL_MAX; @@ -127,6 +131,9 @@ public class AmqpConnection extends AmqpAbstractResource implements getEndpoint().setProperties(getOfferedProperties()); } + if (getIdleTimeout() > 0) { + protonTransport.setIdleTimeout(getIdleTimeout()); + } protonTransport.setMaxFrameSize(getMaxFrameSize()); protonTransport.setChannelMax(getChannelMax()); protonTransport.bind(getEndpoint()); @@ -359,6 +366,30 @@ public class AmqpConnection extends AmqpAbstractResource implements return new UnmodifiableConnection(getEndpoint()); } + public AmqpConnectionListener getListener() { + return listener; + } + + public void setListener(AmqpConnectionListener listener) { + this.listener = listener; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } + + public void setIdleProcessingDisabled(boolean value) { + this.idleProcessingDisabled = value; + } + + public boolean isIdleProcessingDisabled() { + return idleProcessingDisabled; + } + //----- Internal getters used from the child AmqpResource classes --------// ScheduledExecutorService getScheduler() { @@ -397,6 +428,11 @@ public class AmqpConnection extends AmqpAbstractResource implements ByteBuffer source = input.toByteBuffer(); LOG.trace("Received from Broker {} bytes:", source.remaining()); + if (protonTransport.isClosed()) { + LOG.debug("Ignoring incoming data because transport is closed"); + return; + } + do { ByteBuffer buffer = protonTransport.getInputBuffer(); int limit = Math.min(buffer.remaining(), source.remaining()); @@ -431,6 +467,37 @@ public class AmqpConnection extends AmqpAbstractResource implements protected void doOpenCompletion() { // If the remote indicates that a close is pending, don't open. if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { + + if (!isIdleProcessingDisabled()) { + long nextKeepAliveTime = protonTransport.tick(System.currentTimeMillis()); + if (nextKeepAliveTime > 0) { + + getScheduler().schedule(new Runnable() { + + @Override + public void run() { + try { + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + LOG.debug("Client performing next idle check"); + long rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis(); + pumpToProtonTransport(); + if (protonTransport.isClosed()) { + LOG.debug("Transport closed after inactivity check."); + throw new InactivityIOException("Channel was inactive for to long"); + } + + if (rescheduleAt > 0) { + getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); + } + } + } catch (Exception e) { + transport.close(); + fireClientException(e); + } + } + }, nextKeepAliveTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + } + } super.doOpenCompletion(); } } @@ -446,9 +513,9 @@ public class AmqpConnection extends AmqpAbstractResource implements } protected void fireClientException(Throwable ex) { - AmqpClientListener listener = this.listener; + AmqpConnectionListener listener = this.listener; if (listener != null) { - listener.onClientException(ex); + listener.onException(ex); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java similarity index 93% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java index 3df7cf4f0b..96d26b45c6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientListener.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnectionListener.java @@ -19,7 +19,7 @@ package org.apache.activemq.transport.amqp.client; /** * Events points exposed by the AmqpClient object. */ -public interface AmqpClientListener { +public interface AmqpConnectionListener { /** * Indicates some error has occurred during client operations. @@ -27,6 +27,6 @@ public interface AmqpClientListener { * @param ex * The error that triggered this event. */ - void onClientException(Throwable ex); + void onException(Throwable ex); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java similarity index 88% rename from activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java rename to activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java index 9b2394cb10..66704e41ea 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultClientListener.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpDefaultConnectionListener.java @@ -19,10 +19,10 @@ package org.apache.activemq.transport.amqp.client; /** * Default listener implementation that stubs out all the event methods. */ -public class AmqpDefaultClientListener implements AmqpClientListener { +public class AmqpDefaultConnectionListener implements AmqpConnectionListener { @Override - public void onClientException(Throwable ex) { + public void onException(Throwable ex) { } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java index 58249a763c..158ae0d86a 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java @@ -179,13 +179,12 @@ public class UnmodifiableConnection implements Connection { } @Override - public Transport getTransport() { - return connection.getTransport(); + public String getContainer() { + return connection.getContainer(); } @Override - public String getContainer() - { - return connection.getContainer(); + public Transport getTransport() { + return new UnmodifiableTransport(connection.getTransport()); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java new file mode 100644 index 0000000000..3d05d50f36 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableTransport.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client.util; + +import java.nio.ByteBuffer; + +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Ssl; +import org.apache.qpid.proton.engine.SslDomain; +import org.apache.qpid.proton.engine.SslPeerDetails; +import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.TransportException; +import org.apache.qpid.proton.engine.TransportResult; + +/** + * Unmodifiable Transport wrapper used to prevent test code from accidentally + * modifying Transport state. + */ +public class UnmodifiableTransport implements Transport { + + private final Transport transport; + + public UnmodifiableTransport(Transport transport) { + this.transport = transport; + } + + @Override + public void close() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void free() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public Object getContext() { + return null; + } + + @Override + public EndpointState getLocalState() { + return transport.getLocalState(); + } + + @Override + public ErrorCondition getRemoteCondition() { + return transport.getRemoteCondition(); + } + + @Override + public EndpointState getRemoteState() { + return transport.getRemoteState(); + } + + @Override + public void open() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void setCondition(ErrorCondition arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void setContext(Object arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void bind(Connection arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public int capacity() { + return transport.capacity(); + } + + @Override + public void close_head() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void close_tail() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public int getChannelMax() { + return transport.getChannelMax(); + } + + @Override + public ErrorCondition getCondition() { + return transport.getCondition(); + } + + @Override + public int getIdleTimeout() { + return transport.getIdleTimeout(); + } + + @Override + public ByteBuffer getInputBuffer() { + return null; + } + + @Override + public int getMaxFrameSize() { + return transport.getMaxFrameSize(); + } + + @Override + public ByteBuffer getOutputBuffer() { + return null; + } + + @Override + public int getRemoteChannelMax() { + return transport.getRemoteChannelMax(); + } + + @Override + public int getRemoteIdleTimeout() { + return transport.getRemoteIdleTimeout(); + } + + @Override + public int getRemoteMaxFrameSize() { + return transport.getRemoteMaxFrameSize(); + } + + @Override + public ByteBuffer head() { + return null; + } + + @Override + public int input(byte[] arg0, int arg1, int arg2) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public boolean isClosed() { + return transport.isClosed(); + } + + @Override + public int output(byte[] arg0, int arg1, int arg2) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void outputConsumed() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public int pending() { + return transport.pending(); + } + + @Override + public void pop(int arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void process() throws TransportException { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public TransportResult processInput() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public Sasl sasl() throws IllegalStateException { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void setChannelMax(int arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void setIdleTimeout(int arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void setMaxFrameSize(int arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public Ssl ssl(SslDomain arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public Ssl ssl(SslDomain arg0, SslPeerDetails arg1) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public ByteBuffer tail() { + return null; + } + + @Override + public long tick(long arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void trace(int arg0) { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } + + @Override + public void unbind() { + throw new UnsupportedOperationException("Cannot alter the Transport"); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java new file mode 100644 index 0000000000..d33a2173d1 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpConnectionListener; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; + +/** + * Test handling of heartbeats requested by the broker. + */ +public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport { + + private final int TEST_IDLE_TIMEOUT = 3000; + + @Override + protected String getAdditionalConfig() { + return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT; + } + + @Test(timeout = 60000) + public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + assertEquals(TEST_IDLE_TIMEOUT / 2, connection.getTransport().getRemoteIdleTimeout()); + } + }); + + AmqpConnection connection = client.connect(); + assertNotNull(connection); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testClientWithoutHeartbeatsGetsDropped() throws Exception { + + final CountDownLatch disconnected = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.createConnection(); + assertNotNull(connection); + + connection.setIdleProcessingDisabled(true); + connection.setListener(new AmqpConnectionListener() { + + @Override + public void onException(Throwable ex) { + disconnected.countDown(); + } + }); + + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + assertTrue(disconnected.await(30, TimeUnit.SECONDS)); + + connection.close(); + + assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testClientWithHeartbeatsStaysAlive() throws Exception { + + final CountDownLatch disconnected = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.createConnection(); + assertNotNull(connection); + + connection.setListener(new AmqpConnectionListener() { + + @Override + public void onException(Throwable ex) { + disconnected.countDown(); + } + }); + + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + assertFalse(disconnected.await(10, TimeUnit.SECONDS)); + + connection.close(); + + assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java new file mode 100644 index 0000000000..c7ab0cddca --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpConnectionListener; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.activemq.util.Wait; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; + +/** + * Tests that cover broker behavior when the client requests heartbeats + */ +public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport { + + private final int TEST_IDLE_TIMEOUT = 3000; + + @Override + protected String getAdditionalConfig() { + return "&transport.wireFormat.idleTimeout=0"; + } + + @Test(timeout = 60000) + public void testBrokerWitZeroIdleTimeDoesNotAdvertise() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + assertEquals(0, connection.getTransport().getRemoteIdleTimeout()); + } + }); + + AmqpConnection connection = client.connect(); + assertNotNull(connection); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testBrokerSendsRequestedHeartbeats() throws Exception { + + final CountDownLatch disconnected = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.createConnection(); + connection.setIdleTimeout(TEST_IDLE_TIMEOUT); + assertNotNull(connection); + + connection.setListener(new AmqpConnectionListener() { + + @Override + public void onException(Throwable ex) { + disconnected.countDown(); + } + }); + + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + assertFalse(disconnected.await(10, TimeUnit.SECONDS)); + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + connection.close(); + + assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDisabledInactivityMonitorTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDisabledInactivityMonitorTest.java new file mode 100644 index 0000000000..572442209a --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDisabledInactivityMonitorTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; + +/** + * Test broker behaviors around Idle timeout when the inactivity monitor is disabled. + */ +public class AmqpDisabledInactivityMonitorTest extends AmqpClientTestSupport { + + private final int TEST_IDLE_TIMEOUT = 3000; + + @Override + protected String getAdditionalConfig() { + return "&transport.useInactivityMonitor=false&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT; + } + + @Test(timeout = 60000) + public void testBrokerDoesNotRequestIdleTimeout() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + client.setValidator(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + assertEquals(0, connection.getTransport().getRemoteIdleTimeout()); + } + }); + + AmqpConnection connection = client.connect(); + assertNotNull(connection); + + connection.getStateInspector().assertValid(); + connection.close(); + } + + @Test(timeout = 60000) + public void testClientWithIdleTimeoutIsRejected() throws Exception { + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.createConnection(); + connection.setIdleTimeout(TEST_IDLE_TIMEOUT); + assertNotNull(connection); + + try { + connection.connect(); + fail("Connection should be rejected when idle frames can't be met."); + } catch (Exception ex) { + } + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSocketProxyIdleTimeoutTests.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSocketProxyIdleTimeoutTests.java new file mode 100644 index 0000000000..4d04f75dd9 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSocketProxyIdleTimeoutTests.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpConnectionListener; +import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for idle timeout processing using SocketProxy to interrupt coms. + */ +public class AmqpSocketProxyIdleTimeoutTests extends AmqpClientTestSupport { + + private final int TEST_IDLE_TIMEOUT = 3000; + + private SocketProxy socketProxy; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + socketProxy = new SocketProxy(super.getBrokerAmqpConnectionURI()); + } + + @Override + @After + public void tearDown() throws Exception { + if (socketProxy != null) { + socketProxy.close(); + socketProxy = null; + } + + super.tearDown(); + } + + @Override + public URI getBrokerAmqpConnectionURI() { + return socketProxy.getUrl(); + } + + @Override + protected String getAdditionalConfig() { + return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT; + } + + @Test(timeout = 60000) + public void testBrokerSendsRequestedHeartbeats() throws Exception { + + final CountDownLatch disconnected = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.createConnection(); + connection.setIdleTimeout(TEST_IDLE_TIMEOUT); + assertNotNull(connection); + + connection.setListener(new AmqpConnectionListener() { + + @Override + public void onException(Throwable ex) { + disconnected.countDown(); + } + }); + + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + assertFalse(disconnected.await(5, TimeUnit.SECONDS)); + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + socketProxy.pause(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + assertTrue(disconnected.await(10, TimeUnit.SECONDS)); + + socketProxy.goOn(); + + connection.close(); + + assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testClientWithoutHeartbeatsGetsDropped() throws Exception { + + final CountDownLatch disconnected = new CountDownLatch(1); + + AmqpClient client = createAmqpClient(); + assertNotNull(client); + + AmqpConnection connection = client.createConnection(); + connection.setCloseTimeout(1000); // Socket will have silently gone away, don't wait to long. + assertNotNull(connection); + + connection.setListener(new AmqpConnectionListener() { + + @Override + public void onException(Throwable ex) { + disconnected.countDown(); + } + }); + + connection.connect(); + + assertEquals(1, getProxyToBroker().getCurrentConnectionsCount()); + + socketProxy.pause(); + + // Client still sends ok but broker doesn't see them. + assertFalse(disconnected.await(5, TimeUnit.SECONDS)); + socketProxy.halfClose(); + assertTrue(disconnected.await(15, TimeUnit.SECONDS)); + socketProxy.close(); + + connection.close(); + + assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return getProxyToBroker().getCurrentConnectionsCount() == 0; + } + })); + } +}