This commit is contained in:
Clebert Suconic 2017-05-02 13:01:29 -04:00
commit 03bce9a120
1 changed files with 10 additions and 6 deletions

View File

@ -25,8 +25,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
@ -34,6 +32,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
@ -43,6 +42,9 @@ import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.Transport;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
public class ProtonHandler extends ProtonInitializable { public class ProtonHandler extends ProtonInitializable {
private static final Logger log = Logger.getLogger(ProtonHandler.class); private static final Logger log = Logger.getLogger(ProtonHandler.class);
@ -89,7 +91,6 @@ public class ProtonHandler extends ProtonInitializable {
connection.collect(collector); connection.collect(collector);
} }
public long tick(boolean firstTick) { public long tick(boolean firstTick) {
lock.lock(); lock.lock();
try { try {
@ -141,7 +142,6 @@ public class ProtonHandler extends ProtonInitializable {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return false; return false;
} }
} }
public Transport getTransport() { public Transport getTransport() {
@ -168,7 +168,6 @@ public class ProtonHandler extends ProtonInitializable {
} }
this.serverSasl.server(); this.serverSasl.server();
serverSasl.setMechanisms(names); serverSasl.setMechanisms(names);
} }
public void flushBytes() { public void flushBytes() {
@ -348,7 +347,12 @@ public class ProtonHandler extends ProtonInitializable {
Events.dispatch(ev, h); Events.dispatch(ev, h);
} catch (Exception e) { } catch (Exception e) {
log.warn(e.getMessage(), e); log.warn(e.getMessage(), e);
connection.setCondition(new ErrorCondition()); ErrorCondition error = new ErrorCondition();
error.setCondition(AmqpError.INTERNAL_ERROR);
error.setDescription("Unrecoverable error: " +
(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage()));
connection.setCondition(error);
connection.close();
} }
} }