This commit is contained in:
Clebert Suconic 2019-07-16 13:36:42 -04:00
commit 1ce7ca4af3
13 changed files with 446 additions and 38 deletions

View File

@ -16,11 +16,27 @@
*/
package org.apache.activemq.artemis.core.server;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
public interface ActiveMQComponent {
void start() throws Exception;
void stop() throws Exception;
default Future<?> asyncStop() {
CompletableFuture<?> future = new CompletableFuture<>();
try {
stop();
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}
boolean isStarted();
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
/**
* Utility adapted from: org.apache.activemq.util.Wait
*/
public class Wait {
public static final long MAX_WAIT_MILLIS = 30 * 1000;
public static final int SLEEP_MILLIS = 1000;
public interface Condition {
boolean isSatisfied() throws Exception;
}
public interface LongCondition {
long getCount() throws Exception;
}
public interface IntCondition {
int getCount() throws Exception;
}
public static boolean waitFor(Condition condition) throws Exception {
return waitFor(condition, MAX_WAIT_MILLIS);
}
public static void assertEquals(long size, LongCondition condition) throws Exception {
assertEquals(size, condition, MAX_WAIT_MILLIS);
}
public static void assertEquals(long size, LongCondition condition, long timeout) throws Exception {
assertEquals(size, condition, timeout, SLEEP_MILLIS);
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis) throws Exception {
boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis);
if (!result) {
Assert.fail(size + " != " + condition.getCount());
}
}
public static void assertEquals(int size, IntCondition condition) throws Exception {
assertEquals(size, condition, MAX_WAIT_MILLIS);
}
public static void assertEquals(int size, IntCondition condition, long timeout) throws Exception {
assertEquals(size, condition, timeout, SLEEP_MILLIS);
}
public static void assertEquals(int size, IntCondition condition, long timeout, long sleepMillis) throws Exception {
boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis);
if (!result) {
Assert.fail(size + " != " + condition.getCount());
}
}
public static void assertTrue(Condition condition) {
assertTrue("Condition wasn't met", condition);
}
public static void assertFalse(Condition condition) throws Exception {
assertTrue(() -> !condition.isSatisfied());
}
public static void assertFalse(String failureMessage, Condition condition) {
assertTrue(failureMessage, () -> !condition.isSatisfied());
}
public static void assertTrue(String failureMessage, Condition condition) {
assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
}
public static void assertTrue(String failureMessage, Condition condition, final long duration) {
assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
}
public static void assertTrue(Condition condition, final long duration, final long sleep) throws Exception {
assertTrue("condition not met", condition, duration, sleep);
}
public static void assertTrue(String failureMessage, Condition condition, final long duration, final long sleep) {
boolean result = waitFor(condition, duration, sleep);
if (!result) {
Assert.fail(failureMessage);
}
}
public static boolean waitFor(final Condition condition, final long duration) throws Exception {
return waitFor(condition, duration, SLEEP_MILLIS);
}
public static boolean waitFor(final Condition condition,
final long durationMillis,
final long sleepMillis) {
try {
final long expiry = System.currentTimeMillis() + durationMillis;
boolean conditionSatisified = condition.isSatisfied();
while (!conditionSatisified && System.currentTimeMillis() < expiry) {
if (sleepMillis == 0) {
Thread.yield();
} else {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
}
conditionSatisified = condition.isSatisfied();
}
return conditionSatisified;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -376,7 +376,9 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
long oldPosition = file.position();
file.open();
if (!file.isOpen()) {
file.open();
}
file.position(0);
for (;;) {

View File

@ -37,6 +37,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@ -221,6 +222,8 @@ public class NettyAcceptor extends AbstractAcceptor {
final Executor failureExecutor;
private Future<?> asyncStopFuture = null;
public NettyAcceptor(final String name,
final ClusterConnection clusterConnection,
final Map<String, Object> configuration,
@ -645,6 +648,13 @@ public class NettyAcceptor extends AbstractAcceptor {
return this.configuration;
}
@Override
public java.util.concurrent.Future<?> asyncStop() {
stop();
return asyncStopFuture;
}
@Override
public synchronized void stop() {
if (channelClazz == null) {
@ -685,7 +695,7 @@ public class NettyAcceptor extends AbstractAcceptor {
// Shutdown the EventLoopGroup if no new task was added for 100ms or if
// 3000ms elapsed.
eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
asyncStopFuture = eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
eventLoopGroup = null;
channelClazz = null;

View File

@ -32,6 +32,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -407,15 +408,27 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
conn.disconnect(criticalError);
}
Map<Acceptor, Future<?>> acceptorFutures = new HashMap<>();
for (Acceptor acceptor : acceptors.values()) {
try {
acceptor.stop();
acceptorFutures.put(acceptor, acceptor.asyncStop());
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
}
}
for (Map.Entry<Acceptor, Future<?>> acceptorFuture : acceptorFutures.entrySet()) {
if (acceptorFuture.getValue() != null) {
try {
acceptorFuture.getValue().get();
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptorFuture.getKey().getName());
}
}
}
acceptors.clear();
acceptorFutures.clear();
connections.clear();
connectionCountLatch.setCount(0);

View File

@ -1269,6 +1269,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.analyzer = null;
}
activateCallbacks.clear();
if (identity != null) {
ActiveMQServerLogger.LOGGER.serverStopped("identity=" + identity + ",version=" + getVersion().getFullVersion(), tempNodeID, getUptime());
} else {

View File

@ -171,6 +171,9 @@ public abstract class ActiveMQTestBase extends Assert {
@ClassRule
public static ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
@Rule
public NoProcessFilesBehind noProcessFilesBehind = new NoProcessFilesBehind(-1, 1000);
/** We should not under any circunstance create data outside of ./target
* if you have a test failing because because of this rule for any reason,
* even if you use afterClass events, move the test to ./target and always cleanup after
@ -276,6 +279,7 @@ public abstract class ActiveMQTestBase extends Assert {
@After
public void tearDown() throws Exception {
noProcessFilesBehind.tearDown();
closeAllSessionFactories();
closeAllServerLocatorsFactories();

View File

@ -0,0 +1,202 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.util;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.logging.Logger;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
/**
* This is useful to make sure you won't have leaking threads between tests
*/
public class NoProcessFilesBehind extends TestWatcher {
private static Logger log = Logger.getLogger(NoProcessFilesBehind.class);
/**
* -1 on maxVariance means no check
*/
public NoProcessFilesBehind(int maxVariance, long maxFiles) {
this.maxVariance = maxVariance;
this.maxFiles = maxFiles;
}
long fdBefore;
int maxVariance;
long maxFiles;
List<String> openFilesBefore;
List<String> openFilesAfter;
static OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
public static long getOpenFD() {
if (os instanceof UnixOperatingSystemMXBean) {
return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
} else {
return 0;
}
}
@Override
protected void starting(Description description) {
LibaioContext.isLoaded();
fdBefore = getOpenFD();
openFilesBefore = getOpenFiles(true);
}
public static List<String> getOpenFiles(boolean filtered) {
ArrayList<String> openFiles = new ArrayList<>();
try {
String outputLine;
int processId = getProcessId();
Process child = Runtime.getRuntime().exec("lsof -a -p " + processId + " -d ^txt,^mem,^cwd,^rtd,^DEL", new String[] {});
try (BufferedReader processInput = new BufferedReader(new InputStreamReader(child.getInputStream()))) {
processInput.readLine();
while ((outputLine = processInput.readLine()) != null) {
if (!filtered || (!outputLine.endsWith(".jar") && !outputLine.endsWith(".so") && !outputLine.contains("type=STREAM")))
openFiles.add(outputLine);
}
}
} catch (Exception ignore) {
}
return openFiles;
}
private static List<String> getDiffFiles(List<String> xOpenFiles, List<String> yOpenFiles) {
ArrayList<String> diffOpenFiles = new ArrayList<>();
for (String xOpenFile : xOpenFiles) {
boolean found = false;
Iterator<String> yOpenFilesIterator = yOpenFiles.iterator();
String xOpenFileS = xOpenFile.replaceAll("\\s+", "\\\\s+");
while (yOpenFilesIterator.hasNext() && !found) {
String yOpenFileS = yOpenFilesIterator.next().replaceAll("\\s+", "\\\\s+");
found = yOpenFileS.equals(xOpenFileS);
}
if (!found) {
diffOpenFiles.add(xOpenFile);
}
}
return diffOpenFiles;
}
private static int getProcessId() throws ReflectiveOperationException {
java.lang.management.RuntimeMXBean runtime = java.lang.management.ManagementFactory.getRuntimeMXBean();
java.lang.reflect.Field jvmField = runtime.getClass().getDeclaredField("jvm");
jvmField.setAccessible(true);
sun.management.VMManagement jvm = (sun.management.VMManagement) jvmField.get(runtime);
java.lang.reflect.Method getProcessIdMethod = jvm.getClass().getDeclaredMethod("getProcessId");
getProcessIdMethod.setAccessible(true);
return (Integer) getProcessIdMethod.invoke(jvm);
}
public void tearDown() {
openFilesAfter = getOpenFiles(true);
}
@Override
protected void failed(Throwable e, Description description) {
}
@Override
protected void succeeded(Description description) {
}
List<String> getVariance() {
long fdAfter = getOpenFD();
long variance = fdAfter - fdBefore;
if (variance > 0) {
List<String> currOpenFiles = getOpenFiles(true);
List<String> diffOpenFiles = getDiffFiles(currOpenFiles, openFilesBefore);
List<String> skippingOpenFiles = getDiffFiles(currOpenFiles, openFilesAfter);
List<String> leavingOpenFiles = getDiffFiles(diffOpenFiles, skippingOpenFiles);
return leavingOpenFiles;
} else {
return new ArrayList<>();
}
}
/**
* Override to tear down your specific external resource.
*/
@Override
protected void finished(Description description) {
long fdAfter = getOpenFD();
List<String> variance = getVariance();
if (variance.size() > 0) {
log.warn("test " + description.toString() + " is leaving " + variance.size() + " files open with a total number of files open = " + fdAfter);
System.err.println("test " + description.toString() + " is leaving " + variance.size() + " files open with a total number of files open = " + fdAfter);
for (String openFile : variance) {
System.err.println(openFile);
}
}
if (maxVariance > 0) {
VarianceCondition varianceCondition = new VarianceCondition();
Wait.assertTrue("The test " + description.toString() + " is leaving " + varianceCondition.getVarianceSize() + " files open, which is more than " + maxVariance + " max open", varianceCondition, 5000, 0);
}
Wait.assertTrue("Too many open files", () -> getOpenFD() < maxFiles, 5000, 0);
}
class VarianceCondition implements Wait.Condition {
private List<String> variance = null;
public long getVarianceSize() {
if (variance != null) {
return variance.size();
} else {
return 0;
}
}
@Override
public boolean isSatisfied() throws Exception {
variance = getVariance();
return variance.size() < maxVariance;
}
}
}

View File

@ -437,51 +437,51 @@ public class ConsumerTest extends ActiveMQTestBase {
ConnectionFactory factorySend = createFactory(protocolSender);
ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
StringBuilder bufferLarge = new StringBuilder();
while (bufferLarge.length() < 100 * 1024) {
bufferLarge.append(" ");
}
final String bufferLargeContent = bufferLarge.toString();
Connection connection = factorySend.createConnection();
try (Connection connection = factorySend.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
try (MessageProducer producer = session.createProducer(queue)) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
StringBuffer bufferLarge = new StringBuffer();
while (bufferLarge.length() < 100 * 1024) {
bufferLarge.append(" ");
msg = session.createTextMessage(bufferLargeContent);
msg.setIntProperty("mycount", 1);
producer.send(msg);
}
}
msg = session.createTextMessage(bufferLarge.toString());
msg.setIntProperty("mycount", 1);
producer.send(msg);
connection = factoryConsume.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(QUEUE.toString());
}
try (Connection connection = factoryConsume.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
MessageConsumer consumer = session.createConsumer(queue);
try (MessageConsumer consumer = session.createConsumer(queue)) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText());
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText());
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(1, message.getIntProperty("mycount"));
Assert.assertEquals(bufferLarge.toString(), message.getText());
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(1, message.getIntProperty("mycount"));
Assert.assertEquals(bufferLargeContent, message.getText());
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
} finally {
connection.close();
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
}
}
}
}

View File

@ -248,6 +248,8 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
pd.send(sn0.createTextMessage("hello " + i));
}
cn0.close();
// Messages should stay in node 1 and note get redistributed.
assertEquals(NUMBER_OF_MESSAGES, servers[0].locateQueue(queueName).getMessageCount());
assertEquals(0, servers[1].locateQueue(queueName).getMessageCount());
@ -371,6 +373,10 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
connection[node].close();
}
}
@Test
@ -451,6 +457,7 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
connection[1].close();
// this wil be after redistribution
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
connection[0].close();
}

View File

@ -57,6 +57,8 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.fusesource.hawtdispatch.DispatchPriority;
import org.fusesource.hawtdispatch.internal.DispatcherConfig;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
@ -75,6 +77,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
private static final Logger LOG = LoggerFactory.getLogger(MQTTTestSupport.class);
static {
DispatcherConfig.getDefaultDispatcher().getThreadQueues(DispatchPriority.DEFAULT);
}
protected int port = 1883;
protected ConnectionFactory cf;
protected LinkedList<Throwable> exceptions = new LinkedList<>();

View File

@ -105,6 +105,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
// await possible exceptions
Thread.sleep(1000);
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
executor.shutdown();
}
@Test

View File

@ -58,5 +58,6 @@ public class QueuePeristPauseTest extends ActiveMQTestBase {
Assert.assertFalse(queue.isPaused());
}
server.stop();
}
}