diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 9bbed32129..baa747fc43 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -270,11 +270,13 @@ class AmqpProtocolConverter { public void onAMQPException(IOException error) { closedSocket = true; - if( !closing) { - System.out.println("AMQP client disconnected"); - error.printStackTrace(); + if( !closing ) { + amqpTransport.sendToActiveMQ(error); } else { - doClose(); + try { + amqpTransport.stop(); + } catch (Exception ignore) { + } } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java index 50d50853ec..ba2a6ac467 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java @@ -29,6 +29,8 @@ public interface AmqpTransport { public void sendToActiveMQ(Command command); + public void sendToActiveMQ(IOException command); + public void sendToAmqp(Object command) throws IOException; public X509Certificate[] getPeerCertificates(); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index 75e5b49fd0..6226626c08 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -72,18 +72,18 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor @Override public void onException(IOException error) { + protocolConverter.lock.lock(); try { - protocolConverter.lock.lock(); - try { - protocolConverter.onAMQPException(error); - } finally { - protocolConverter.lock.unlock(); - } + protocolConverter.onAMQPException(error); } finally { - super.onException(error); + protocolConverter.lock.unlock(); } } + public void sendToActiveMQ(IOException error) { + super.onException(error); + } + public void onCommand(Object command) { try { if (trace) {