This closes #3323
This commit is contained in:
commit
f56215efc6
|
@ -247,7 +247,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
|||
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
|
||||
server.getRemotingService().addConnectionEntry(connection, entry);
|
||||
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
|
||||
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler()));
|
||||
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
|
||||
|
||||
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
|
||||
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
|
||||
|
|
|
@ -16,12 +16,15 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.protocol.amqp.connect;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.group.ChannelGroup;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
|
||||
|
||||
/**
|
||||
* Common handler implementation for client and server side handler.
|
||||
|
@ -32,11 +35,17 @@ public class AMQPBrokerConnectionChannelHandler extends ChannelDuplexHandler {
|
|||
|
||||
private final ProtonHandler handler;
|
||||
|
||||
volatile boolean active;
|
||||
private final ClientConnectionLifeCycleListener listener;
|
||||
|
||||
protected AMQPBrokerConnectionChannelHandler(final ChannelGroup group, final ProtonHandler handler) {
|
||||
private final Executor listenerExecutor;
|
||||
|
||||
private boolean active = true;
|
||||
|
||||
protected AMQPBrokerConnectionChannelHandler(final ChannelGroup group, final ProtonHandler handler, ClientConnectionLifeCycleListener listener, Executor executor) {
|
||||
this.group = group;
|
||||
this.handler = handler;
|
||||
this.listener = listener;
|
||||
this.listenerExecutor = executor;
|
||||
}
|
||||
|
||||
protected static Object channelId(Channel channel) {
|
||||
|
@ -49,6 +58,17 @@ public class AMQPBrokerConnectionChannelHandler extends ChannelDuplexHandler {
|
|||
ctx.fireChannelActive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
synchronized (this) {
|
||||
if (active) {
|
||||
listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel())));
|
||||
super.channelInactive(ctx);
|
||||
active = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue