From 86183d7b578f9d4c076c31785e9e2950abec8a58 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 30 Apr 2018 11:23:03 -0400 Subject: [PATCH 1/2] NO-JIRA Adding proper parameter on JdbcLeaseLockTest --- .../artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java index 84d3dd4f5a..2ecddbf3b9 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -46,18 +46,16 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { private DatabaseStorageConfiguration dbConf; private SQLProvider sqlProvider; - @Parameterized.Parameters(name = "create_tables_prior_test") + @Parameterized.Parameters(name = "create_tables_prior_test={0}") public static List data() { return Arrays.asList(new Object[][] { - {true, null}, - {false, null} + {true}, + {false} }); } @Parameter(0) public boolean withExistingTable; - @Parameter(1) - public Object result; private LeaseLock lock() { From e8104586b417a01de570847519cb6d4969cbc9ee Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 30 Apr 2018 10:25:38 -0400 Subject: [PATCH 2/2] ARTEMIS-1837 Replication Initial catchup deadlocks if clients disconnect This is fixing existing tests, so no more tests are needed. --- .../impl/netty/ActiveMQChannelHandler.java | 12 +++++++++--- .../core/remoting/impl/netty/NettyConnector.java | 7 ++++--- .../core/remoting/impl/netty/NettyAcceptor.java | 13 ++++++++++--- .../remoting/impl/netty/NettyAcceptorFactory.java | 4 +++- .../remoting/impl/netty/NettyAcceptorTest.java | 14 ++++++++------ 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index 1542b05297..d0d5c0e992 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; +import java.util.concurrent.Executor; + import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -41,12 +43,16 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { volatile boolean active; + private final Executor listenerExecutor; + protected ActiveMQChannelHandler(final ChannelGroup group, final BufferHandler handler, - final BaseConnectionLifeCycleListener listener) { + final BaseConnectionLifeCycleListener listener, + final Executor listenerExecutor) { this.group = group; this.handler = handler; this.listener = listener; + this.listenerExecutor = listenerExecutor; } @Override @@ -75,7 +81,7 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { public void channelInactive(final ChannelHandlerContext ctx) throws Exception { synchronized (this) { if (active) { - listener.connectionDestroyed(channelId(ctx.channel())); + listenerExecutor.execute(() -> listener.connectionDestroyed(channelId(ctx.channel()))); active = false; } @@ -98,7 +104,7 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { synchronized (listener) { try { - listener.connectionException(channelId(ctx.channel()), me); + listenerExecutor.execute(() -> listener.connectionException(channelId(ctx.channel()), me)); active = false; } catch (Exception ex) { ActiveMQClientLogger.LOGGER.errorCallingLifeCycleListener(ex); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 38289ec901..c946f9c33a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -588,7 +588,7 @@ public class NettyConnector extends AbstractConnector { protocolManager.addChannelHandlers(pipeline); - pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener())); + pipeline.addLast(new ActiveMQClientChannelHandler(channelGroup, handler, new Listener(), closeExecutor)); } }); @@ -830,8 +830,9 @@ public class NettyConnector extends AbstractConnector { ActiveMQClientChannelHandler(final ChannelGroup group, final BufferHandler handler, - final ClientConnectionLifeCycleListener listener) { - super(group, handler, listener); + final ClientConnectionLifeCycleListener listener, + final Executor executor) { + super(group, handler, listener, executor); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 5af3db76b5..ed1a9412d2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -216,15 +217,20 @@ public class NettyAcceptor extends AbstractAcceptor { final AtomicBoolean warningPrinted = new AtomicBoolean(false); + final Executor failureExecutor; + public NettyAcceptor(final String name, final ClusterConnection clusterConnection, final Map configuration, final BufferHandler handler, final ServerConnectionLifeCycleListener listener, final ScheduledExecutorService scheduledThreadPool, + final Executor failureExecutor, final Map protocolMap) { super(protocolMap); + this.failureExecutor = failureExecutor; + this.name = name; this.clusterConnection = clusterConnection; @@ -740,7 +746,7 @@ public class NettyAcceptor extends AbstractAcceptor { } public ConnectionCreator createConnectionCreator() { - return new ActiveMQServerChannelHandler(channelGroup, handler, new Listener()); + return new ActiveMQServerChannelHandler(channelGroup, handler, new Listener(), failureExecutor); } private static String getProtocols(Map protocolManager) { @@ -763,8 +769,9 @@ public class NettyAcceptor extends AbstractAcceptor { ActiveMQServerChannelHandler(final ChannelGroup group, final BufferHandler handler, - final ServerConnectionLifeCycleListener listener) { - super(group, handler, listener); + final ServerConnectionLifeCycleListener listener, + final Executor failureExecutor) { + super(group, handler, listener, failureExecutor); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java index 5628a7f34f..dbaa731854 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; +import org.apache.activemq.artemis.utils.actors.OrderedExecutor; public class NettyAcceptorFactory implements AcceptorFactory { @@ -38,6 +39,7 @@ public class NettyAcceptorFactory implements AcceptorFactory { final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, final Map protocolMap) { - return new NettyAcceptor(name, connection, configuration, handler, listener, scheduledThreadPool, protocolMap); + Executor failureExecutor = new OrderedExecutor(threadPool); + return new NettyAcceptor(name, connection, configuration, handler, listener, scheduledThreadPool, failureExecutor, protocolMap); } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java index fb4382e827..6206bf9657 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java @@ -18,9 +18,9 @@ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -42,6 +42,7 @@ import org.junit.Test; public class NettyAcceptorTest extends ActiveMQTestBase { private ScheduledExecutorService pool2; + private ExecutorService pool3; @Override @Before @@ -57,6 +58,10 @@ public class NettyAcceptorTest extends ActiveMQTestBase { try { ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT); } finally { + + if (pool3 != null) + pool3.shutdown(); + if (pool2 != null) pool2.shutdownNow(); super.tearDown(); @@ -94,7 +99,8 @@ public class NettyAcceptorTest extends ActiveMQTestBase { } }; pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()); - NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap()); + pool3 = Executors.newSingleThreadExecutor(ActiveMQThreadFactory.defaultThreadFactory()); + NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, pool3, new HashMap()); addActiveMQComponent(acceptor); acceptor.start(); @@ -108,10 +114,6 @@ public class NettyAcceptorTest extends ActiveMQTestBase { acceptor.stop(); Assert.assertFalse(acceptor.isStarted()); ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT); - - pool2.shutdown(); - - pool2.awaitTermination(1, TimeUnit.SECONDS); } }