Add inactivity monitoring and heartbeats to the AMQP transport.
This commit is contained in:
Timothy Bish 2015-05-05 18:36:15 -04:00
parent fa81c1ff73
commit 4b4cf7c09e
17 changed files with 1043 additions and 75 deletions

View File

@ -30,7 +30,6 @@ import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,13 +39,16 @@ public class AmqpInactivityMonitor extends TransportFilter {
private static final Logger LOG = LoggerFactory.getLogger(AmqpInactivityMonitor.class);
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
private static Timer ACTIVITY_CHECK_TIMER;
private static int CONNECTION_CHECK_TASK_COUNTER;
private static Timer CONNECTION_CHECK_TASK_TIMER;
private static int KEEPALIVE_TASK_COUNTER;
private static Timer KEEPALIVE_TASK_TIMER;
private final AtomicBoolean failed = new AtomicBoolean(false);
private AmqpProtocolConverter protocolConverter;
private AmqpTransport amqpTransport;
private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
private SchedulerTimerTask connectCheckerTask;
private final Runnable connectChecker = new Runnable() {
@ -54,18 +56,44 @@ public class AmqpInactivityMonitor extends TransportFilter {
@Override
public void run() {
long now = System.currentTimeMillis();
if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No connection attempt made in time for " + AmqpInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AmqpInactivityMonitor.this.toString());
ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() {
onException(new InactivityIOException("Channel was inactive for too (>" + (connectionTimeout) + ") long: "
+ next.getRemoteAddress()));
onException(new InactivityIOException(
"Channel was inactive for too (>" + (connectionTimeout) + ") long: " + next.getRemoteAddress()));
}
});
}
}
};
private SchedulerTimerTask keepAliveTask;
private final Runnable keepAlive = new Runnable() {
@Override
public void run() {
if (keepAliveTask != null && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
ASYNC_TASKS.execute(new Runnable() {
@Override
public void run() {
try {
long nextIdleUpdate = amqpTransport.keepAlive();
if (nextIdleUpdate > 0) {
synchronized (AmqpInactivityMonitor.this) {
if (keepAliveTask != null) {
keepAliveTask = new SchedulerTimerTask(keepAlive);
KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextIdleUpdate);
}
}
}
} catch (Exception ex) {
onException(new InactivityIOException(
"Exception while performing idle checks for connection: " + next.getRemoteAddress()));
}
}
});
}
@ -83,30 +111,31 @@ public class AmqpInactivityMonitor extends TransportFilter {
@Override
public void stop() throws Exception {
stopConnectChecker();
stopConnectionTimeoutChecker();
stopKeepAliveTask();
next.stop();
}
@Override
public void onException(IOException error) {
if (failed.compareAndSet(false, true)) {
stopConnectChecker();
if (protocolConverter != null) {
protocolConverter.onAMQPException(error);
stopConnectionTimeoutChecker();
if (amqpTransport != null) {
amqpTransport.onException(error);
}
transportListener.onException(error);
}
}
public void setProtocolConverter(AmqpProtocolConverter protocolConverter) {
this.protocolConverter = protocolConverter;
public void setAmqpTransport(AmqpTransport amqpTransport) {
this.amqpTransport = amqpTransport;
}
public AmqpProtocolConverter getProtocolConverter() {
return protocolConverter;
public AmqpTransport getAmqpTransport() {
return amqpTransport;
}
public synchronized void startConnectChecker(long connectionTimeout) {
public synchronized void startConnectionTimeoutChecker(long connectionTimeout) {
this.connectionTimeout = connectionTimeout;
if (connectionTimeout > 0 && connectCheckerTask == null) {
connectCheckerTask = new SchedulerTimerTask(connectChecker);
@ -114,29 +143,68 @@ public class AmqpInactivityMonitor extends TransportFilter {
long connectionCheckInterval = Math.min(connectionTimeout, 1000);
synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) {
if (CONNECTION_CHECK_TASK_COUNTER == 0) {
if (ASYNC_TASKS == null) {
ASYNC_TASKS = createExecutor();
ACTIVITY_CHECK_TIMER = new Timer("AMQP InactivityMonitor State Check", true);
}
CHECKER_COUNTER++;
ACTIVITY_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
CONNECTION_CHECK_TASK_TIMER = new Timer("AMQP InactivityMonitor State Check", true);
}
CONNECTION_CHECK_TASK_COUNTER++;
CONNECTION_CHECK_TASK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
}
}
}
public synchronized void stopConnectChecker() {
/**
* Starts the keep alive task which will run after the given delay.
*
* @param nextKeepAliveCheck
* time in milliseconds to wait before performing the next keep-alive check.
*/
public synchronized void startKeepAliveTask(long nextKeepAliveCheck) {
if (nextKeepAliveCheck > 0 && keepAliveTask == null) {
keepAliveTask = new SchedulerTimerTask(keepAlive);
synchronized (AbstractInactivityMonitor.class) {
if (KEEPALIVE_TASK_COUNTER == 0) {
if (ASYNC_TASKS == null) {
ASYNC_TASKS = createExecutor();
}
KEEPALIVE_TASK_TIMER = new Timer("AMQP InactivityMonitor Idle Update", true);
}
KEEPALIVE_TASK_COUNTER++;
KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextKeepAliveCheck);
}
}
}
public synchronized void stopConnectionTimeoutChecker() {
if (connectCheckerTask != null) {
connectCheckerTask.cancel();
connectCheckerTask = null;
synchronized (AbstractInactivityMonitor.class) {
ACTIVITY_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
ACTIVITY_CHECK_TIMER.cancel();
ACTIVITY_CHECK_TIMER = null;
ThreadPoolUtils.shutdown(ASYNC_TASKS);
ASYNC_TASKS = null;
CONNECTION_CHECK_TASK_TIMER.purge();
CONNECTION_CHECK_TASK_COUNTER--;
if (CONNECTION_CHECK_TASK_COUNTER == 0) {
CONNECTION_CHECK_TASK_TIMER.cancel();
CONNECTION_CHECK_TASK_TIMER = null;
}
}
}
}
public synchronized void stopKeepAliveTask() {
if (keepAliveTask != null) {
keepAliveTask.cancel();
keepAliveTask = null;
synchronized (AbstractInactivityMonitor.class) {
KEEPALIVE_TASK_TIMER.purge();
KEEPALIVE_TASK_COUNTER--;
if (KEEPALIVE_TASK_COUNTER == 0) {
KEEPALIVE_TASK_TIMER.cancel();
KEEPALIVE_TASK_TIMER = null;
}
}
}
@ -152,7 +220,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
};
private ThreadPoolExecutor createExecutor() {
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 90, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
exec.allowCoreThreadTimeOut(true);
return exec;
}

View File

@ -65,4 +65,15 @@ public interface AmqpProtocolConverter {
*/
void updateTracer();
/**
* Perform any keep alive processing for the connection such as sending
* empty frames or closing connections due to remote end being inactive
* for to long.
*
* @returns the amount of milliseconds to wait before performaing another check.
*
* @throws IOException if an error occurs on writing heatbeats to the wire.
*/
long keepAlive() throws IOException;
}

View File

@ -111,4 +111,9 @@ public class AmqpProtocolDiscriminator implements AmqpProtocolConverter {
@Override
public void updateTracer() {
}
@Override
public long keepAlive() {
return 0;
}
}

View File

@ -54,4 +54,8 @@ public interface AmqpTransport {
public AmqpInactivityMonitor getInactivityMonitor();
public boolean isUseInactivityMonitor();
public long keepAlive();
}

View File

@ -59,8 +59,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
@Override
public void start() throws Exception {
if (monitor != null) {
monitor.setProtocolConverter(protocolConverter);
monitor.startConnectChecker(getConnectAttemptTimeout());
monitor.setAmqpTransport(this);
monitor.startConnectionTimeoutChecker(getConnectAttemptTimeout());
}
super.start();
}
@ -135,6 +135,26 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
}
}
@Override
public long keepAlive() {
long nextKeepAliveDelay = 0l;
try {
lock.lock();
try {
nextKeepAliveDelay = protocolConverter.keepAlive();
} finally {
lock.unlock();
}
} catch (IOException e) {
handleException(e);
} catch (Exception e) {
onException(IOExceptionSupport.create(e));
}
return nextKeepAliveDelay;
}
@Override
public X509Certificate[] getPeerCertificates() {
if (next instanceof SslTransport) {
@ -210,11 +230,16 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
return monitor;
}
public long getConnectAttemptTimeout() {
@Override
public boolean isUseInactivityMonitor() {
return monitor != null;
}
public int getConnectAttemptTimeout() {
return wireFormat.getConnectAttemptTimeout();
}
public void setConnectAttemptTimeout(long connectAttemptTimeout) {
public void setConnectAttemptTimeout(int connectAttemptTimeout) {
wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
}
}

View File

@ -37,7 +37,8 @@ public class AmqpWireFormat implements WireFormat {
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
public static final int DEFAULT_IDLE_TIMEOUT = 30000;
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
private static final int SASL_PROTOCOL = 3;
@ -45,7 +46,8 @@ public class AmqpWireFormat implements WireFormat {
private int version = 1;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
private int producerCredit = DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
@ -206,11 +208,11 @@ public class AmqpWireFormat implements WireFormat {
this.allowNonSaslConnections = allowNonSaslConnections;
}
public long getConnectAttemptTimeout() {
public int getConnectAttemptTimeout() {
return connectAttemptTimeout;
}
public void setConnectAttemptTimeout(long connectAttemptTimeout) {
public void setConnectAttemptTimeout(int connectAttemptTimeout) {
this.connectAttemptTimeout = connectAttemptTimeout;
}
@ -229,4 +231,12 @@ public class AmqpWireFormat implements WireFormat {
public void setTransformer(String transformer) {
this.transformer = transformer;
}
public int getIdleTimeout() {
return idelTimeout;
}
public void setIdleTimeout(int idelTimeout) {
this.idelTimeout = idelTimeout;
}
}

View File

@ -26,6 +26,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -55,6 +56,7 @@ import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.AmqpHeader;
import org.apache.activemq.transport.amqp.AmqpInactivityMonitor;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
@ -74,6 +76,7 @@ import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
@ -123,7 +126,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
if (monitor != null) {
monitor.setProtocolConverter(this);
monitor.setAmqpTransport(amqpTransport);
}
this.amqpWireFormat = transport.getWireFormat();
@ -200,6 +203,28 @@ public class AmqpConnection implements AmqpProtocolConverter {
}
}
@Override
public long keepAlive() throws IOException {
long rescheduleAt = 0l;
LOG.trace("Performing connection:{} keep-alive processing", amqpTransport.getRemoteAddress());
if (protonConnection.getLocalState() != EndpointState.CLOSED) {
rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis();
pumpProtonToSocket();
if (protonTransport.isClosed()) {
rescheduleAt = 0;
LOG.debug("Transport closed after inactivity check.");
throw new InactivityIOException("Channel was inactive for to long");
}
}
LOG.trace("Connection:{} keep alive processing done, next update in {} milliseconds.",
amqpTransport.getRemoteAddress(), rescheduleAt);
return rescheduleAt;
}
//----- Connection Properties Accessors ----------------------------------//
/**
@ -281,6 +306,11 @@ public class AmqpConnection implements AmqpProtocolConverter {
frame = (Buffer) command;
}
if (protonTransport.isClosed()) {
LOG.debug("Ignoring incoming AMQP data, transport is closed.");
return;
}
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
@ -357,11 +387,11 @@ public class AmqpConnection implements AmqpProtocolConverter {
protected void processConnectionOpen(Connection connection) throws Exception {
stopConnectionTimeoutChecker();
connectionInfo.setResponseRequired(true);
connectionInfo.setConnectionId(connectionId);
configureInactivityMonitor();
String clientId = protonConnection.getRemoteContainer();
if (clientId != null && !clientId.isEmpty()) {
connectionInfo.setClientId(clientId);
@ -369,6 +399,20 @@ public class AmqpConnection implements AmqpProtocolConverter {
connectionInfo.setTransportContext(amqpTransport.getPeerCertificates());
if (connection.getTransport().getRemoteIdleTimeout() > 0 && !amqpTransport.isUseInactivityMonitor()) {
// We cannot meet the requested Idle processing because the inactivity monitor is
// disabled so we won't send idle frames to match the request.
protonConnection.setProperties(getFailedConnetionProperties());
protonConnection.open();
protonConnection.setCondition(new ErrorCondition(AmqpError.PRECONDITION_FAILED, "Cannot send idle frames"));
protonConnection.close();
pumpProtonToSocket();
amqpTransport.onException(new IOException(
"Connection failed, remote requested idle processing but inactivity monitoring is disbaled."));
return;
}
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
@ -389,9 +433,17 @@ public class AmqpConnection implements AmqpProtocolConverter {
protonConnection.close();
} else {
if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());
}
protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
protonConnection.setProperties(getConnetionProperties());
protonConnection.open();
configureInactivityMonitor();
}
} finally {
pumpProtonToSocket();
@ -678,12 +730,28 @@ public class AmqpConnection implements AmqpProtocolConverter {
return new SessionId(connectionId, nextSessionId++);
}
private void stopConnectionTimeoutChecker() {
AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
if (monitor != null) {
monitor.stopConnectionTimeoutChecker();
}
}
private void configureInactivityMonitor() {
AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
if (monitor == null) {
return;
}
monitor.stopConnectChecker();
// If either end has idle timeout requirements then the tick method
// will give us a deadline on the next time we need to tick() in order
// to meet those obligations.
long nextIdleCheck = protonTransport.tick(System.currentTimeMillis());
if (nextIdleCheck > 0) {
LOG.trace("Connection keep-alive processing starts at: {}", new Date(nextIdleCheck));
monitor.startKeepAliveTask(nextIdleCheck - System.currentTimeMillis());
} else {
LOG.trace("Connection does not require keep-alive processing");
}
}
}

View File

@ -99,24 +99,6 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe
this.closeRequest = request;
doClose();
}
// // If already closed signal success or else the caller might never get notified.
// if (getEndpoint().getLocalState() == EndpointState.CLOSED ||
// getEndpoint().getRemoteState() == EndpointState.CLOSED) {
//
// if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
// // Remote already closed this resource, close locally and free.
// if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
// doClose();
// getEndpoint().free();
// }
// }
//
// request.onSuccess();
// return;
// }
//
// this.closeRequest = request;
// doClose();
}
@Override

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.ClientTcpTransport;
@ -40,6 +41,7 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
@ -77,9 +79,11 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private List<Symbol> offeredCapabilities = Collections.emptyList();
private Map<Symbol, Object> offeredProperties = Collections.emptyMap();
private AmqpClientListener listener;
private AmqpConnectionListener listener;
private SaslAuthenticator authenticator;
private int idleTimeout = 0;
private boolean idleProcessingDisabled;
private String containerId;
private boolean authenticated;
private int channelMax = DEFAULT_CHANNEL_MAX;
@ -127,6 +131,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
getEndpoint().setProperties(getOfferedProperties());
}
if (getIdleTimeout() > 0) {
protonTransport.setIdleTimeout(getIdleTimeout());
}
protonTransport.setMaxFrameSize(getMaxFrameSize());
protonTransport.setChannelMax(getChannelMax());
protonTransport.bind(getEndpoint());
@ -359,6 +366,30 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
return new UnmodifiableConnection(getEndpoint());
}
public AmqpConnectionListener getListener() {
return listener;
}
public void setListener(AmqpConnectionListener listener) {
this.listener = listener;
}
public int getIdleTimeout() {
return idleTimeout;
}
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
public void setIdleProcessingDisabled(boolean value) {
this.idleProcessingDisabled = value;
}
public boolean isIdleProcessingDisabled() {
return idleProcessingDisabled;
}
//----- Internal getters used from the child AmqpResource classes --------//
ScheduledExecutorService getScheduler() {
@ -397,6 +428,11 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
ByteBuffer source = input.toByteBuffer();
LOG.trace("Received from Broker {} bytes:", source.remaining());
if (protonTransport.isClosed()) {
LOG.debug("Ignoring incoming data because transport is closed");
return;
}
do {
ByteBuffer buffer = protonTransport.getInputBuffer();
int limit = Math.min(buffer.remaining(), source.remaining());
@ -431,6 +467,37 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
protected void doOpenCompletion() {
// If the remote indicates that a close is pending, don't open.
if (!getEndpoint().getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
if (!isIdleProcessingDisabled()) {
long nextKeepAliveTime = protonTransport.tick(System.currentTimeMillis());
if (nextKeepAliveTime > 0) {
getScheduler().schedule(new Runnable() {
@Override
public void run() {
try {
if (getEndpoint().getLocalState() != EndpointState.CLOSED) {
LOG.debug("Client performing next idle check");
long rescheduleAt = protonTransport.tick(System.currentTimeMillis()) - System.currentTimeMillis();
pumpToProtonTransport();
if (protonTransport.isClosed()) {
LOG.debug("Transport closed after inactivity check.");
throw new InactivityIOException("Channel was inactive for to long");
}
if (rescheduleAt > 0) {
getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS);
}
}
} catch (Exception e) {
transport.close();
fireClientException(e);
}
}
}, nextKeepAliveTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
super.doOpenCompletion();
}
}
@ -446,9 +513,9 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
protected void fireClientException(Throwable ex) {
AmqpClientListener listener = this.listener;
AmqpConnectionListener listener = this.listener;
if (listener != null) {
listener.onClientException(ex);
listener.onException(ex);
}
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.transport.amqp.client;
/**
* Events points exposed by the AmqpClient object.
*/
public interface AmqpClientListener {
public interface AmqpConnectionListener {
/**
* Indicates some error has occurred during client operations.
@ -27,6 +27,6 @@ public interface AmqpClientListener {
* @param ex
* The error that triggered this event.
*/
void onClientException(Throwable ex);
void onException(Throwable ex);
}

View File

@ -19,10 +19,10 @@ package org.apache.activemq.transport.amqp.client;
/**
* Default listener implementation that stubs out all the event methods.
*/
public class AmqpDefaultClientListener implements AmqpClientListener {
public class AmqpDefaultConnectionListener implements AmqpConnectionListener {
@Override
public void onClientException(Throwable ex) {
public void onException(Throwable ex) {
}
}

View File

@ -179,13 +179,12 @@ public class UnmodifiableConnection implements Connection {
}
@Override
public Transport getTransport() {
return connection.getTransport();
public String getContainer() {
return connection.getContainer();
}
@Override
public String getContainer()
{
return connection.getContainer();
public Transport getTransport() {
return new UnmodifiableTransport(connection.getTransport());
}
}

View File

@ -0,0 +1,248 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client.util;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Ssl;
import org.apache.qpid.proton.engine.SslDomain;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.engine.TransportResult;
/**
* Unmodifiable Transport wrapper used to prevent test code from accidentally
* modifying Transport state.
*/
public class UnmodifiableTransport implements Transport {
private final Transport transport;
public UnmodifiableTransport(Transport transport) {
this.transport = transport;
}
@Override
public void close() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void free() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Object getContext() {
return null;
}
@Override
public EndpointState getLocalState() {
return transport.getLocalState();
}
@Override
public ErrorCondition getRemoteCondition() {
return transport.getRemoteCondition();
}
@Override
public EndpointState getRemoteState() {
return transport.getRemoteState();
}
@Override
public void open() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setCondition(ErrorCondition arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setContext(Object arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void bind(Connection arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int capacity() {
return transport.capacity();
}
@Override
public void close_head() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void close_tail() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int getChannelMax() {
return transport.getChannelMax();
}
@Override
public ErrorCondition getCondition() {
return transport.getCondition();
}
@Override
public int getIdleTimeout() {
return transport.getIdleTimeout();
}
@Override
public ByteBuffer getInputBuffer() {
return null;
}
@Override
public int getMaxFrameSize() {
return transport.getMaxFrameSize();
}
@Override
public ByteBuffer getOutputBuffer() {
return null;
}
@Override
public int getRemoteChannelMax() {
return transport.getRemoteChannelMax();
}
@Override
public int getRemoteIdleTimeout() {
return transport.getRemoteIdleTimeout();
}
@Override
public int getRemoteMaxFrameSize() {
return transport.getRemoteMaxFrameSize();
}
@Override
public ByteBuffer head() {
return null;
}
@Override
public int input(byte[] arg0, int arg1, int arg2) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public boolean isClosed() {
return transport.isClosed();
}
@Override
public int output(byte[] arg0, int arg1, int arg2) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void outputConsumed() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public int pending() {
return transport.pending();
}
@Override
public void pop(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void process() throws TransportException {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public TransportResult processInput() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Sasl sasl() throws IllegalStateException {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setChannelMax(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setIdleTimeout(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void setMaxFrameSize(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Ssl ssl(SslDomain arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public Ssl ssl(SslDomain arg0, SslPeerDetails arg1) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public ByteBuffer tail() {
return null;
}
@Override
public long tick(long arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void trace(int arg0) {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
@Override
public void unbind() {
throw new UnsupportedOperationException("Cannot alter the Transport");
}
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
/**
* Test handling of heartbeats requested by the broker.
*/
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT;
}
@Test(timeout = 60000)
public void testBrokerSendsHalfConfiguredIdleTimeout() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
assertEquals(TEST_IDLE_TIMEOUT / 2, connection.getTransport().getRemoteIdleTimeout());
}
});
AmqpConnection connection = client.connect();
assertNotNull(connection);
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testClientWithoutHeartbeatsGetsDropped() throws Exception {
final CountDownLatch disconnected = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
assertNotNull(connection);
connection.setIdleProcessingDisabled(true);
connection.setListener(new AmqpConnectionListener() {
@Override
public void onException(Throwable ex) {
disconnected.countDown();
}
});
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertTrue(disconnected.await(30, TimeUnit.SECONDS));
connection.close();
assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}));
}
@Test(timeout = 60000)
public void testClientWithHeartbeatsStaysAlive() throws Exception {
final CountDownLatch disconnected = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
assertNotNull(connection);
connection.setListener(new AmqpConnectionListener() {
@Override
public void onException(Throwable ex) {
disconnected.countDown();
}
});
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertFalse(disconnected.await(10, TimeUnit.SECONDS));
connection.close();
assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}));
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
/**
* Tests that cover broker behavior when the client requests heartbeats
*/
public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=0";
}
@Test(timeout = 60000)
public void testBrokerWitZeroIdleTimeDoesNotAdvertise() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
assertEquals(0, connection.getTransport().getRemoteIdleTimeout());
}
});
AmqpConnection connection = client.connect();
assertNotNull(connection);
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testBrokerSendsRequestedHeartbeats() throws Exception {
final CountDownLatch disconnected = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
assertNotNull(connection);
connection.setListener(new AmqpConnectionListener() {
@Override
public void onException(Throwable ex) {
disconnected.countDown();
}
});
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertFalse(disconnected.await(10, TimeUnit.SECONDS));
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
connection.close();
assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}));
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
/**
* Test broker behaviors around Idle timeout when the inactivity monitor is disabled.
*/
public class AmqpDisabledInactivityMonitorTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
@Override
protected String getAdditionalConfig() {
return "&transport.useInactivityMonitor=false&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT;
}
@Test(timeout = 60000)
public void testBrokerDoesNotRequestIdleTimeout() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
assertEquals(0, connection.getTransport().getRemoteIdleTimeout());
}
});
AmqpConnection connection = client.connect();
assertNotNull(connection);
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testClientWithIdleTimeoutIsRejected() throws Exception {
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
assertNotNull(connection);
try {
connection.connect();
fail("Connection should be rejected when idle frames can't be met.");
} catch (Exception ex) {
}
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpConnectionListener;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test for idle timeout processing using SocketProxy to interrupt coms.
*/
public class AmqpSocketProxyIdleTimeoutTests extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
private SocketProxy socketProxy;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
socketProxy = new SocketProxy(super.getBrokerAmqpConnectionURI());
}
@Override
@After
public void tearDown() throws Exception {
if (socketProxy != null) {
socketProxy.close();
socketProxy = null;
}
super.tearDown();
}
@Override
public URI getBrokerAmqpConnectionURI() {
return socketProxy.getUrl();
}
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT;
}
@Test(timeout = 60000)
public void testBrokerSendsRequestedHeartbeats() throws Exception {
final CountDownLatch disconnected = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
connection.setIdleTimeout(TEST_IDLE_TIMEOUT);
assertNotNull(connection);
connection.setListener(new AmqpConnectionListener() {
@Override
public void onException(Throwable ex) {
disconnected.countDown();
}
});
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertFalse(disconnected.await(5, TimeUnit.SECONDS));
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
socketProxy.pause();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
assertTrue(disconnected.await(10, TimeUnit.SECONDS));
socketProxy.goOn();
connection.close();
assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}));
}
@Test(timeout = 60000)
public void testClientWithoutHeartbeatsGetsDropped() throws Exception {
final CountDownLatch disconnected = new CountDownLatch(1);
AmqpClient client = createAmqpClient();
assertNotNull(client);
AmqpConnection connection = client.createConnection();
connection.setCloseTimeout(1000); // Socket will have silently gone away, don't wait to long.
assertNotNull(connection);
connection.setListener(new AmqpConnectionListener() {
@Override
public void onException(Throwable ex) {
disconnected.countDown();
}
});
connection.connect();
assertEquals(1, getProxyToBroker().getCurrentConnectionsCount());
socketProxy.pause();
// Client still sends ok but broker doesn't see them.
assertFalse(disconnected.await(5, TimeUnit.SECONDS));
socketProxy.halfClose();
assertTrue(disconnected.await(15, TimeUnit.SECONDS));
socketProxy.close();
connection.close();
assertTrue("Connection should get cleaned up.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 0;
}
}));
}
}