This commit is contained in:
Clebert Suconic 2019-07-18 16:28:04 -04:00
commit 30ee0daa90
4 changed files with 26 additions and 26 deletions

View File

@ -16,26 +16,16 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public interface ActiveMQComponent { public interface ActiveMQComponent {
void start() throws Exception; void start() throws Exception;
void stop() throws Exception; void stop() throws Exception;
default Future<?> asyncStop() { default void asyncStop(Runnable callback) throws Exception {
CompletableFuture<?> future = new CompletableFuture<>(); stop();
try { callback.run();
stop();
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
} }
boolean isStarted(); boolean isStarted();

View File

@ -36,8 +36,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -222,8 +222,6 @@ public class NettyAcceptor extends AbstractAcceptor {
final Executor failureExecutor; final Executor failureExecutor;
private Future<?> asyncStopFuture = null;
public NettyAcceptor(final String name, public NettyAcceptor(final String name,
final ClusterConnection clusterConnection, final ClusterConnection clusterConnection,
final Map<String, Object> configuration, final Map<String, Object> configuration,
@ -649,15 +647,18 @@ public class NettyAcceptor extends AbstractAcceptor {
} }
@Override @Override
public java.util.concurrent.Future<?> asyncStop() { public void stop() throws Exception {
stop(); CountDownLatch latch = new CountDownLatch(1);
return asyncStopFuture; asyncStop(latch::countDown);
latch.await();
} }
@Override @Override
public synchronized void stop() { public synchronized void asyncStop(Runnable callback) {
if (channelClazz == null) { if (channelClazz == null) {
callback.run();
return; return;
} }
@ -693,11 +694,6 @@ public class NettyAcceptor extends AbstractAcceptor {
} }
} }
// Shutdown the EventLoopGroup if no new task was added for 100ms or if
// 3000ms elapsed.
asyncStopFuture = eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
eventLoopGroup = null;
channelClazz = null; channelClazz = null;
for (Connection connection : connections.values()) { for (Connection connection : connections.values()) {
@ -720,6 +716,11 @@ public class NettyAcceptor extends AbstractAcceptor {
} }
paused = false; paused = false;
// Shutdown the EventLoopGroup if no new task was added for 100ms or if
// 3000ms elapsed.
eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS).addListener(f -> callback.run());
eventLoopGroup = null;
} }
@Override @Override

View File

@ -79,6 +79,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class); private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class);
private static final int ACCEPTOR_STOP_TIMEOUT = 3000;
// Attributes ---------------------------------------------------- // Attributes ----------------------------------------------------
private volatile boolean started = false; private volatile boolean started = false;
@ -407,13 +409,16 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
conn.disconnect(criticalError); conn.disconnect(criticalError);
} }
CountDownLatch acceptorCountDownLatch = new CountDownLatch(acceptors.size());
for (Acceptor acceptor : acceptors.values()) { for (Acceptor acceptor : acceptors.values()) {
try { try {
acceptor.stop(); acceptor.asyncStop(acceptorCountDownLatch::countDown);
} catch (Throwable t) { } catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName()); ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
} }
} }
//In some cases an acceptor stopping could be locked ie NettyAcceptor stopping could be locked by a network failure.
acceptorCountDownLatch.await(ACCEPTOR_STOP_TIMEOUT, TimeUnit.MILLISECONDS);
acceptors.clear(); acceptors.clear();

View File

@ -171,6 +171,9 @@ public abstract class ActiveMQTestBase extends Assert {
@ClassRule @ClassRule
public static ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule(); public static ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
@Rule
public NoProcessFilesBehind noProcessFilesBehind = new NoProcessFilesBehind(-1, 1000);
/** We should not under any circunstance create data outside of ./target /** We should not under any circunstance create data outside of ./target
* if you have a test failing because because of this rule for any reason, * if you have a test failing because because of this rule for any reason,
* even if you use afterClass events, move the test to ./target and always cleanup after * even if you use afterClass events, move the test to ./target and always cleanup after
@ -276,6 +279,7 @@ public abstract class ActiveMQTestBase extends Assert {
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
noProcessFilesBehind.tearDown();
closeAllSessionFactories(); closeAllSessionFactories();
closeAllServerLocatorsFactories(); closeAllServerLocatorsFactories();