This closes #1046
This commit is contained in:
commit
cea51bee53
|
@ -101,7 +101,7 @@ public class ActiveMQScheduledComponentTest {
|
|||
|
||||
local.stop();
|
||||
|
||||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
|
||||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() <= 5 && count.get() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -206,9 +206,9 @@ public class EmbeddedActiveMQResource extends ExternalResource {
|
|||
protected void after() {
|
||||
log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
|
||||
|
||||
super.after();
|
||||
|
||||
this.stop();
|
||||
|
||||
super.after();
|
||||
}
|
||||
|
||||
public boolean isUseDurableMessage() {
|
||||
|
|
|
@ -44,6 +44,9 @@ public class ActiveMQDynamicProducerResourceTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
|
|
|
@ -36,6 +36,9 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
|
|
|
@ -45,6 +45,9 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
|
|
|
@ -43,6 +43,9 @@ public class ActiveMQProducerResourceTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
|
|
|
@ -36,6 +36,12 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest {
|
|||
static final String TEST_QUEUE = "test.queue";
|
||||
static final String TEST_ADDRESS = "test.address";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
CoreQueueConfiguration queueConfiguration = new CoreQueueConfiguration().setAddress(TEST_ADDRESS).setName(TEST_QUEUE);
|
||||
Configuration customConfiguration = new ConfigurationImpl().setPersistenceEnabled(false).setSecurityEnabled(true).addQueueConfiguration(queueConfiguration);
|
||||
|
||||
|
|
|
@ -32,6 +32,12 @@ public class EmbeddedActiveMQResourceFileConfigurationTest {
|
|||
static final String TEST_QUEUE = "test.queue";
|
||||
static final String TEST_ADDRESS = "test.address";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
private EmbeddedActiveMQResource server = new EmbeddedActiveMQResource("embedded-artemis-server.xml");
|
||||
|
||||
@Rule
|
||||
|
|
|
@ -45,6 +45,9 @@ public class EmbeddedActiveMQResourceTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
public EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
|
|
|
@ -43,6 +43,12 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest {
|
|||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-minimal-server.xml", "embedded-artemis-jms-only.xml");
|
||||
|
||||
@Rule
|
||||
|
|
|
@ -47,6 +47,10 @@ public class EmbeddedJMSResourceQueueTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
||||
|
|
|
@ -43,6 +43,12 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest {
|
|||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-jms-server.xml");
|
||||
|
||||
@Rule
|
||||
|
|
|
@ -54,6 +54,10 @@ public class EmbeddedJMSResourceTopicTest {
|
|||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
||||
|
|
|
@ -38,6 +38,11 @@ public class MultipleEmbeddedActiveMQResourcesTest {
|
|||
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
public EmbeddedActiveMQResource serverOne = new EmbeddedActiveMQResource(0);
|
||||
|
||||
public EmbeddedActiveMQResource serverTwo = new EmbeddedActiveMQResource(1);
|
||||
|
|
|
@ -34,6 +34,11 @@ public class MultipleEmbeddedJMSResourcesTest {
|
|||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServerOne = new EmbeddedJMSResource(0);
|
||||
|
||||
public EmbeddedJMSResource jmsServerTwo = new EmbeddedJMSResource(1);
|
||||
|
|
|
@ -20,6 +20,8 @@ import javax.net.ssl.SSLContext;
|
|||
import javax.net.ssl.SSLEngine;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -43,7 +45,9 @@ import io.netty.handler.ssl.SslHandler;
|
|||
import io.netty.util.CharsetUtil;
|
||||
import org.apache.activemq.artemis.component.WebServerComponent;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.dto.WebServerDTO;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -54,12 +58,21 @@ public class WebServerComponentTest extends Assert {
|
|||
static final String SECURE_URL = System.getProperty("url", "https://localhost:8448/WebServerComponentTest.txt");
|
||||
private Bootstrap bootstrap;
|
||||
private EventLoopGroup group;
|
||||
private List<ActiveMQComponent> testedComponents;
|
||||
|
||||
@Before
|
||||
public void setupNetty() throws URISyntaxException {
|
||||
// Configure the client.
|
||||
group = new NioEventLoopGroup();
|
||||
bootstrap = new Bootstrap();
|
||||
testedComponents = new ArrayList<>();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (ActiveMQComponent c : testedComponents) {
|
||||
c.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -70,6 +83,7 @@ public class WebServerComponentTest extends Assert {
|
|||
WebServerComponent webServerComponent = new WebServerComponent();
|
||||
Assert.assertFalse(webServerComponent.isStarted());
|
||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||
testedComponents.add(webServerComponent);
|
||||
webServerComponent.start();
|
||||
// Make the connection attempt.
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -110,6 +124,7 @@ public class WebServerComponentTest extends Assert {
|
|||
WebServerComponent webServerComponent = new WebServerComponent();
|
||||
Assert.assertFalse(webServerComponent.isStarted());
|
||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||
testedComponents.add(webServerComponent);
|
||||
webServerComponent.start();
|
||||
// Make the connection attempt.
|
||||
String keyStoreProvider = "JKS";
|
||||
|
@ -163,6 +178,7 @@ public class WebServerComponentTest extends Assert {
|
|||
WebServerComponent webServerComponent = new WebServerComponent();
|
||||
Assert.assertFalse(webServerComponent.isStarted());
|
||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||
testedComponents.add(webServerComponent);
|
||||
webServerComponent.start();
|
||||
// Make the connection attempt.
|
||||
String keyStoreProvider = "JKS";
|
||||
|
|
|
@ -132,7 +132,7 @@ public class ReceiveTest extends ActiveMQTestBase {
|
|||
public void testReceiveImmediate() throws Exception {
|
||||
|
||||
// forces perfect round robin
|
||||
locator.setConsumerWindowSize(1);
|
||||
locator.setConsumerMaxRate(1);
|
||||
ClientSessionFactory cf = createSessionFactory(locator);
|
||||
ClientSession sendSession = cf.createSession(false, true, true);
|
||||
ClientProducer cp = sendSession.createProducer(addressA);
|
||||
|
|
|
@ -52,6 +52,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
|||
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
|
||||
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
|
||||
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
|
||||
private static final int NUMBER_OF_MESSAGES = 100;
|
||||
|
||||
public static void main(String[] arg) {
|
||||
if (arg.length != 3) {
|
||||
|
@ -77,7 +78,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
|||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
producer.send(session.createTextMessage("hello"));
|
||||
}
|
||||
|
||||
|
@ -125,8 +126,10 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
|||
Destination destination = session.createQueue(destinationName);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Assert.assertNotNull(consumer.receive(1000));
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
// give more time to receive first message but do not wait so long for last as all message were most likely sent
|
||||
Assert.assertNotNull("consumer.receive(...) returned null for " + i + "th message. Number of expected messages" +
|
||||
" to be received is " + NUMBER_OF_MESSAGES, i == NUMBER_OF_MESSAGES - 1 ? consumer.receive(500) : consumer.receive(5000));
|
||||
}
|
||||
} finally {
|
||||
connection.stop();
|
||||
|
@ -152,14 +155,14 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
|||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
int i = 0;
|
||||
for (; i < 100; i++) {
|
||||
Message msg = consumer.receive(100);
|
||||
for (; i < NUMBER_OF_MESSAGES; i++) {
|
||||
Message msg = consumer.receive(1000);
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(i < 100);
|
||||
Assert.assertTrue(i < NUMBER_OF_MESSAGES);
|
||||
} finally {
|
||||
connection.stop();
|
||||
connection.close();
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
|
||||
|
@ -336,6 +337,23 @@ public class FailoverTest extends FailoverTestBase {
|
|||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
|
||||
|
||||
final CountDownLatch connectionFailed = new CountDownLatch(1);
|
||||
|
||||
session.addFailureListener(new SessionFailureListener() {
|
||||
@Override
|
||||
public void beforeReconnect(ActiveMQException exception) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
|
||||
connectionFailed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
|
||||
|
||||
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
|
||||
|
@ -357,6 +375,7 @@ public class FailoverTest extends FailoverTestBase {
|
|||
session.commit(xid, false);
|
||||
} catch (XAException e) {
|
||||
//there is still an edge condition that we must deal with
|
||||
Assert.assertTrue(connectionFailed.await(10, TimeUnit.SECONDS));
|
||||
session.commit(xid, false);
|
||||
}
|
||||
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
|||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
|
@ -956,6 +957,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
|||
|
||||
session.commit();
|
||||
|
||||
// flush executors on queues so we can get precise number of messages
|
||||
Queue queue1 = server.locateQueue(SimpleString.toSimpleString(random1));
|
||||
queue1.flushExecutor();
|
||||
Queue queue2 = server.locateQueue(SimpleString.toSimpleString(random1));
|
||||
queue2.flushExecutor();
|
||||
|
||||
assertEquals(2, serverControl.getTotalMessageCount());
|
||||
|
||||
session.deleteQueue(random1);
|
||||
|
|
|
@ -90,7 +90,8 @@ public final class SpawnedVMSupport {
|
|||
final String... args) throws Exception {
|
||||
ProcessBuilder builder = new ProcessBuilder();
|
||||
final String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString();
|
||||
builder.command(javaPath, memoryArg1, memoryArg2, "-cp", System.getProperty("java.class.path"));
|
||||
builder.command(javaPath, memoryArg1, memoryArg2);
|
||||
builder.environment().put("CLASSPATH", System.getProperty("java.class.path"));
|
||||
|
||||
List<String> commandList = builder.command();
|
||||
|
||||
|
|
Loading…
Reference in New Issue