From 970782d36af7a3ba5cc52fb73fe5718fc0f34021 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 28 Apr 2017 16:21:51 -0400 Subject: [PATCH] ARTEMIS-1134 Close connection if error caught during event processing If an error escapes into the event processing layer we close the connection with an error condition to avoid the client becoming stuck on waiting for a response from the broker and the broker side being in an unknown state. --- .../amqp/proton/handler/ProtonHandler.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index e3cb730d80..eb95decf87 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -25,8 +25,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; @@ -34,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; @@ -43,6 +42,9 @@ import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + public class ProtonHandler extends ProtonInitializable { private static final Logger log = Logger.getLogger(ProtonHandler.class); @@ -89,7 +91,6 @@ public class ProtonHandler extends ProtonInitializable { connection.collect(collector); } - public long tick(boolean firstTick) { lock.lock(); try { @@ -141,7 +142,6 @@ public class ProtonHandler extends ProtonInitializable { Thread.currentThread().interrupt(); return false; } - } public Transport getTransport() { @@ -168,7 +168,6 @@ public class ProtonHandler extends ProtonInitializable { } this.serverSasl.server(); serverSasl.setMechanisms(names); - } public void flushBytes() { @@ -348,7 +347,12 @@ public class ProtonHandler extends ProtonInitializable { Events.dispatch(ev, h); } catch (Exception e) { log.warn(e.getMessage(), e); - connection.setCondition(new ErrorCondition()); + ErrorCondition error = new ErrorCondition(); + error.setCondition(AmqpError.INTERNAL_ERROR); + error.setDescription("Unrecoverable error: " + + (e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage())); + connection.setCondition(error); + connection.close(); } }