ARTEMIS-1134 Close connection if error caught during event processing

If an error escapes into the event processing layer we close the
connection with an error condition to avoid the client becoming stuck on
waiting for a response from the broker and the broker side being in an
unknown state.
This commit is contained in:
Timothy Bish 2017-04-28 16:21:51 -04:00 committed by Clebert Suconic
parent 6a251ee5c5
commit 970782d36a
1 changed files with 10 additions and 6 deletions

View File

@ -25,8 +25,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
@ -34,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
@ -43,6 +42,9 @@ import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
public class ProtonHandler extends ProtonInitializable {
private static final Logger log = Logger.getLogger(ProtonHandler.class);
@ -89,7 +91,6 @@ public class ProtonHandler extends ProtonInitializable {
connection.collect(collector);
}
public long tick(boolean firstTick) {
lock.lock();
try {
@ -141,7 +142,6 @@ public class ProtonHandler extends ProtonInitializable {
Thread.currentThread().interrupt();
return false;
}
}
public Transport getTransport() {
@ -168,7 +168,6 @@ public class ProtonHandler extends ProtonInitializable {
}
this.serverSasl.server();
serverSasl.setMechanisms(names);
}
public void flushBytes() {
@ -348,7 +347,12 @@ public class ProtonHandler extends ProtonInitializable {
Events.dispatch(ev, h);
} catch (Exception e) {
log.warn(e.getMessage(), e);
connection.setCondition(new ErrorCondition());
ErrorCondition error = new ErrorCondition();
error.setCondition(AmqpError.INTERNAL_ERROR);
error.setDescription("Unrecoverable error: " +
(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
connection.setCondition(error);
connection.close();
}
}