diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index e3cb730d80..eb95decf87 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -25,8 +25,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; @@ -34,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; @@ -43,6 +42,9 @@ import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + public class ProtonHandler extends ProtonInitializable { private static final Logger log = Logger.getLogger(ProtonHandler.class); @@ -89,7 +91,6 @@ public class ProtonHandler extends ProtonInitializable { connection.collect(collector); } - public long tick(boolean firstTick) { lock.lock(); try { @@ -141,7 +142,6 @@ public class ProtonHandler extends ProtonInitializable { Thread.currentThread().interrupt(); return false; } - } public Transport getTransport() { @@ -168,7 +168,6 @@ public class ProtonHandler extends ProtonInitializable { } this.serverSasl.server(); serverSasl.setMechanisms(names); - } public void flushBytes() { @@ -348,7 +347,12 @@ public class ProtonHandler extends ProtonInitializable { Events.dispatch(ev, h); } catch (Exception e) { log.warn(e.getMessage(), e); - connection.setCondition(new ErrorCondition()); + ErrorCondition error = new ErrorCondition(); + error.setCondition(AmqpError.INTERNAL_ERROR); + error.setDescription("Unrecoverable error: " + + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + connection.setCondition(error); + connection.close(); } }