ARTEMIS-2969 / ARTEMIS-2937 Dealing with instant disconnects on amqp broker connections

This commit is contained in:
Clebert Suconic 2020-10-30 09:30:56 -04:00
parent 1014db4ef7
commit f3dde91b91
2 changed files with 23 additions and 3 deletions

View File

@ -247,7 +247,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory); ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry); server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection; protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler())); connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session(); session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session); sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);

View File

@ -16,12 +16,15 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.connect; package org.apache.activemq.artemis.protocol.amqp.connect;
import java.util.concurrent.Executor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
/** /**
* Common handler implementation for client and server side handler. * Common handler implementation for client and server side handler.
@ -32,11 +35,17 @@ public class AMQPBrokerConnectionChannelHandler extends ChannelDuplexHandler {
private final ProtonHandler handler; private final ProtonHandler handler;
volatile boolean active; private final ClientConnectionLifeCycleListener listener;
protected AMQPBrokerConnectionChannelHandler(final ChannelGroup group, final ProtonHandler handler) { private final Executor listenerExecutor;
private boolean active = true;
protected AMQPBrokerConnectionChannelHandler(final ChannelGroup group, final ProtonHandler handler, ClientConnectionLifeCycleListener listener, Executor executor) {
this.group = group; this.group = group;
this.handler = handler; this.handler = handler;
this.listener = listener;
this.listenerExecutor = executor;
} }
protected static Object channelId(Channel channel) { protected static Object channelId(Channel channel) {
@ -49,6 +58,17 @@ public class AMQPBrokerConnectionChannelHandler extends ChannelDuplexHandler {
ctx.fireChannelActive(); ctx.fireChannelActive();
} }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
synchronized (this) {
if (active) {
listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel())));
super.channelInactive(ctx);
active = false;
}
}
}
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
} }