This commit is contained in:
Justin Bertram 2018-04-30 10:56:14 -05:00
commit 7bdd0fe170
6 changed files with 37 additions and 21 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.remoting.impl.netty; package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.util.concurrent.Executor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
@ -41,12 +43,16 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
volatile boolean active; volatile boolean active;
private final Executor listenerExecutor;
protected ActiveMQChannelHandler(final ChannelGroup group, protected ActiveMQChannelHandler(final ChannelGroup group,
final BufferHandler handler, final BufferHandler handler,
final BaseConnectionLifeCycleListener<?> listener) { final BaseConnectionLifeCycleListener<?> listener,
final Executor listenerExecutor) {
this.group = group; this.group = group;
this.handler = handler; this.handler = handler;
this.listener = listener; this.listener = listener;
this.listenerExecutor = listenerExecutor;
} }
@Override @Override
@ -75,7 +81,7 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
public void channelInactive(final ChannelHandlerContext ctx) throws Exception { public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
synchronized (this) { synchronized (this) {
if (active) { if (active) {
listener.connectionDestroyed(channelId(ctx.channel())); listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel())));
active = false; active = false;
} }
@ -98,7 +104,7 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
synchronized (listener) { synchronized (listener) {
try { try {
listener.connectionException(channelId(ctx.channel()), me); listenerExecutor.execute(() -> listener.connectionException(channelId(ctx.channel()), me));
active = false; active = false;
} catch (Exception ex) { } catch (Exception ex) {
ActiveMQClientLogger.LOGGER.errorCallingLifeCycleListener(ex); ActiveMQClientLogger.LOGGER.errorCallingLifeCycleListener(ex);

View File

@ -588,7 +588,7 @@ public class NettyConnector extends AbstractConnector {
protocolManager.addChannelHandlers(pipeline); protocolManager.addChannelHandlers(pipeline);
pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener())); pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor));
} }
}); });
@ -830,8 +830,9 @@ public class NettyConnector extends AbstractConnector {
ActiveMQClientChannelHandler(final ChannelGroup group, ActiveMQClientChannelHandler(final ChannelGroup group,
final BufferHandler handler, final BufferHandler handler,
final ClientConnectionLifeCycleListener listener) { final ClientConnectionLifeCycleListener listener,
super(group, handler, listener); final Executor executor) {
super(group, handler, listener, executor);
} }
} }

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -216,15 +217,20 @@ public class NettyAcceptor extends AbstractAcceptor {
final AtomicBoolean warningPrinted = new AtomicBoolean(false); final AtomicBoolean warningPrinted = new AtomicBoolean(false);
final Executor failureExecutor;
public NettyAcceptor(final String name, public NettyAcceptor(final String name,
final ClusterConnection clusterConnection, final ClusterConnection clusterConnection,
final Map<String, Object> configuration, final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ServerConnectionLifeCycleListener listener, final ServerConnectionLifeCycleListener listener,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
final Executor failureExecutor,
final Map<String, ProtocolManager> protocolMap) { final Map<String, ProtocolManager> protocolMap) {
super(protocolMap); super(protocolMap);
this.failureExecutor = failureExecutor;
this.name = name; this.name = name;
this.clusterConnection = clusterConnection; this.clusterConnection = clusterConnection;
@ -740,7 +746,7 @@ public class NettyAcceptor extends AbstractAcceptor {
} }
public ConnectionCreator createConnectionCreator() { public ConnectionCreator createConnectionCreator() {
return new ActiveMQServerChannelHandler(channelGroup, handler, new Listener()); return new ActiveMQServerChannelHandler(channelGroup, handler, new Listener(), failureExecutor);
} }
private static String getProtocols(Map<String, ProtocolManager> protocolManager) { private static String getProtocols(Map<String, ProtocolManager> protocolManager) {
@ -763,8 +769,9 @@ public class NettyAcceptor extends AbstractAcceptor {
ActiveMQServerChannelHandler(final ChannelGroup group, ActiveMQServerChannelHandler(final ChannelGroup group,
final BufferHandler handler, final BufferHandler handler,
final ServerConnectionLifeCycleListener listener) { final ServerConnectionLifeCycleListener listener,
super(group, handler, listener); final Executor failureExecutor) {
super(group, handler, listener, failureExecutor);
} }
@Override @Override

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
public class NettyAcceptorFactory implements AcceptorFactory { public class NettyAcceptorFactory implements AcceptorFactory {
@ -38,6 +39,7 @@ public class NettyAcceptorFactory implements AcceptorFactory {
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
final Map<String, ProtocolManager> protocolMap) { final Map<String, ProtocolManager> protocolMap) {
return new NettyAcceptor(name, connection, configuration, handler, listener, scheduledThreadPool, protocolMap); Executor failureExecutor = new OrderedExecutor(threadPool);
return new NettyAcceptor(name, connection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap);
} }
} }

View File

@ -46,18 +46,16 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
private DatabaseStorageConfiguration dbConf; private DatabaseStorageConfiguration dbConf;
private SQLProvider sqlProvider; private SQLProvider sqlProvider;
@Parameterized.Parameters(name = "create_tables_prior_test") @Parameterized.Parameters(name = "create_tables_prior_test={0}")
public static List<Object[]> data() { public static List<Object[]> data() {
return Arrays.asList(new Object[][] { return Arrays.asList(new Object[][] {
{true, null}, {true},
{false, null} {false}
}); });
} }
@Parameter(0) @Parameter(0)
public boolean withExistingTable; public boolean withExistingTable;
@Parameter(1)
public Object result;
private LeaseLock lock() { private LeaseLock lock() {

View File

@ -18,9 +18,9 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -42,6 +42,7 @@ import org.junit.Test;
public class NettyAcceptorTest extends ActiveMQTestBase { public class NettyAcceptorTest extends ActiveMQTestBase {
private ScheduledExecutorService pool2; private ScheduledExecutorService pool2;
private ExecutorService pool3;
@Override @Override
@Before @Before
@ -57,6 +58,10 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
try { try {
ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT); ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT);
} finally { } finally {
if (pool3 != null)
pool3.shutdown();
if (pool2 != null) if (pool2 != null)
pool2.shutdownNow(); pool2.shutdownNow();
super.tearDown(); super.tearDown();
@ -94,7 +99,8 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
} }
}; };
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap<String, ProtocolManager>()); pool3 = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory());
NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, pool3, new HashMap<String, ProtocolManager>());
addActiveMQComponent(acceptor); addActiveMQComponent(acceptor);
acceptor.start(); acceptor.start();
@ -108,10 +114,6 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
acceptor.stop(); acceptor.stop();
Assert.assertFalse(acceptor.isStarted()); Assert.assertFalse(acceptor.isStarted());
ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT); ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT);
pool2.shutdown();
pool2.awaitTermination(1, TimeUnit.SECONDS);
} }
} }