mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-03-06 01:10:14 +00:00
This closes #1044
This commit is contained in:
commit
229797d15a
@ -101,7 +101,7 @@ public class ActiveMQScheduledComponentTest {
|
||||
|
||||
local.stop();
|
||||
|
||||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
|
||||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() <= 5 && count.get() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -207,9 +207,9 @@ public class EmbeddedActiveMQResource extends ExternalResource {
|
||||
protected void after() {
|
||||
log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
|
||||
|
||||
super.after();
|
||||
|
||||
this.stop();
|
||||
|
||||
super.after();
|
||||
}
|
||||
|
||||
public boolean isUseDurableMessage() {
|
||||
|
@ -68,6 +68,8 @@ public class ActiveMQConsumerResourceTest {
|
||||
|
||||
ClientMessage received = consumer.receiveMessage();
|
||||
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -44,6 +44,9 @@ public class ActiveMQDynamicProducerResourceTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
@ -74,6 +77,8 @@ public class ActiveMQDynamicProducerResourceTest {
|
||||
|
||||
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
|
||||
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedTwo);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -100,4 +105,4 @@ public class ActiveMQDynamicProducerResourceTest {
|
||||
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -36,12 +37,20 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
|
||||
ActiveMQDynamicProducerResource producer = new ActiveMQDynamicProducerResource(server.getVmURL());
|
||||
|
||||
@After
|
||||
public void tear() {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Rule
|
||||
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
|
||||
|
||||
@ -72,4 +81,4 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
|
||||
public void testSendStringAndPropertiesToDefaultAddress() throws Exception {
|
||||
sentOne = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -45,6 +45,9 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
@ -82,6 +85,8 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
|
||||
|
||||
ClientMessage receivedTwo = server.receiveMessage(TEST_QUEUE_TWO);
|
||||
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedTwo);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -108,4 +113,4 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
|
||||
sentTwo = producer.sendMessage(TEST_QUEUE_TWO, TEST_BODY, TEST_PROPERTIES);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,9 @@ public class ActiveMQProducerResourceTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
@ -52,6 +55,7 @@ public class ActiveMQProducerResourceTest {
|
||||
@Rule
|
||||
public RuleChain ruleChain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server).around(producer);
|
||||
|
||||
|
||||
ClientMessage sent = null;
|
||||
|
||||
@After
|
||||
@ -82,4 +86,4 @@ public class ActiveMQProducerResourceTest {
|
||||
sent = producer.sendMessage(TEST_BODY, TEST_PROPERTIES);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.RuleChain;
|
||||
@ -36,6 +37,12 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest {
|
||||
static final String TEST_QUEUE = "test.queue";
|
||||
static final String TEST_ADDRESS = "test.address";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
CoreQueueConfiguration queueConfiguration = new CoreQueueConfiguration().setAddress(TEST_ADDRESS).setName(TEST_QUEUE);
|
||||
Configuration customConfiguration = new ConfigurationImpl().setPersistenceEnabled(false).setSecurityEnabled(true).addQueueConfiguration(queueConfiguration);
|
||||
|
||||
@ -44,6 +51,11 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest {
|
||||
@Rule
|
||||
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server);
|
||||
|
||||
@After
|
||||
public void tear() {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomConfiguration() throws Exception {
|
||||
Configuration configuration = server.getServer().getActiveMQServer().getConfiguration();
|
||||
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.junit;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.RuleChain;
|
||||
@ -32,11 +33,22 @@ public class EmbeddedActiveMQResourceFileConfigurationTest {
|
||||
static final String TEST_QUEUE = "test.queue";
|
||||
static final String TEST_ADDRESS = "test.address";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
private EmbeddedActiveMQResource server = new EmbeddedActiveMQResource("embedded-artemis-server.xml");
|
||||
|
||||
@Rule
|
||||
public RuleChain rulechain = RuleChain.outerRule(new ThreadLeakCheckRule()).around(server);
|
||||
|
||||
@After
|
||||
public void tear() {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfiguredQueue() throws Exception {
|
||||
assertNotNull(TEST_QUEUE + " should exist", server.locateQueue(TEST_QUEUE));
|
||||
|
@ -45,6 +45,9 @@ public class EmbeddedActiveMQResourceTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
public EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||
@ -72,6 +75,8 @@ public class EmbeddedActiveMQResourceTest {
|
||||
|
||||
ClientMessage received = server.receiveMessage(TEST_QUEUE);
|
||||
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_ADDRESS), received);
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -94,4 +99,4 @@ public class EmbeddedActiveMQResourceTest {
|
||||
sent = server.sendMessageWithProperties(TEST_ADDRESS, TEST_BODY, TEST_PROPERTIES);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -43,6 +43,12 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest {
|
||||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-minimal-server.xml", "embedded-artemis-jms-only.xml");
|
||||
|
||||
@Rule
|
||||
|
@ -47,6 +47,10 @@ public class EmbeddedJMSResourceQueueTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
||||
|
@ -43,6 +43,12 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest {
|
||||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-jms-server.xml");
|
||||
|
||||
@Rule
|
||||
|
@ -54,6 +54,10 @@ public class EmbeddedJMSResourceTopicTest {
|
||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
||||
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.junit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
@ -38,6 +39,11 @@ public class MultipleEmbeddedActiveMQResourcesTest {
|
||||
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
public EmbeddedActiveMQResource serverOne = new EmbeddedActiveMQResource(0);
|
||||
|
||||
public EmbeddedActiveMQResource serverTwo = new EmbeddedActiveMQResource(1);
|
||||
@ -51,6 +57,12 @@ public class MultipleEmbeddedActiveMQResourcesTest {
|
||||
serverTwo.createQueue(TEST_ADDRESS_TWO, TEST_QUEUE_TWO);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
serverOne.stop();
|
||||
serverTwo.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleServers() throws Exception {
|
||||
ClientMessage sentOne = serverOne.sendMessage(TEST_ADDRESS_ONE, TEST_BODY);
|
||||
@ -66,4 +78,4 @@ public class MultipleEmbeddedActiveMQResourcesTest {
|
||||
assertNotNull(String.format(ASSERT_RECEIVED_FORMAT, TEST_QUEUE_TWO), receivedOne);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,11 @@ public class MultipleEmbeddedJMSResourcesTest {
|
||||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||
|
||||
static {
|
||||
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||
}
|
||||
|
||||
public EmbeddedJMSResource jmsServerOne = new EmbeddedJMSResource(0);
|
||||
|
||||
public EmbeddedJMSResource jmsServerTwo = new EmbeddedJMSResource(1);
|
||||
|
@ -20,6 +20,8 @@ import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -43,7 +45,9 @@ import io.netty.handler.ssl.SslHandler;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import org.apache.activemq.artemis.component.WebServerComponent;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.dto.WebServerDTO;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -54,12 +58,21 @@ public class WebServerComponentTest extends Assert {
|
||||
static final String SECURE_URL = System.getProperty("url", "https://localhost:8448/WebServerComponentTest.txt");
|
||||
private Bootstrap bootstrap;
|
||||
private EventLoopGroup group;
|
||||
private List<ActiveMQComponent> testedComponents;
|
||||
|
||||
@Before
|
||||
public void setupNetty() throws URISyntaxException {
|
||||
// Configure the client.
|
||||
group = new NioEventLoopGroup();
|
||||
bootstrap = new Bootstrap();
|
||||
testedComponents = new ArrayList<>();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (ActiveMQComponent c : testedComponents) {
|
||||
c.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -70,6 +83,7 @@ public class WebServerComponentTest extends Assert {
|
||||
WebServerComponent webServerComponent = new WebServerComponent();
|
||||
Assert.assertFalse(webServerComponent.isStarted());
|
||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||
testedComponents.add(webServerComponent);
|
||||
webServerComponent.start();
|
||||
// Make the connection attempt.
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
@ -152,6 +166,7 @@ public class WebServerComponentTest extends Assert {
|
||||
WebServerComponent webServerComponent = new WebServerComponent();
|
||||
Assert.assertFalse(webServerComponent.isStarted());
|
||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||
testedComponents.add(webServerComponent);
|
||||
webServerComponent.start();
|
||||
// Make the connection attempt.
|
||||
String keyStoreProvider = "JKS";
|
||||
@ -205,6 +220,7 @@ public class WebServerComponentTest extends Assert {
|
||||
WebServerComponent webServerComponent = new WebServerComponent();
|
||||
Assert.assertFalse(webServerComponent.isStarted());
|
||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||
testedComponents.add(webServerComponent);
|
||||
webServerComponent.start();
|
||||
// Make the connection attempt.
|
||||
String keyStoreProvider = "JKS";
|
||||
|
@ -133,7 +133,7 @@ public class ReceiveTest extends ActiveMQTestBase {
|
||||
public void testReceiveImmediate() throws Exception {
|
||||
|
||||
// forces perfect round robin
|
||||
locator.setConsumerWindowSize(1);
|
||||
locator.setConsumerMaxRate(1);
|
||||
ClientSessionFactory cf = createSessionFactory(locator);
|
||||
ClientSession sendSession = cf.createSession(false, true, true);
|
||||
ClientProducer cp = sendSession.createProducer(addressA);
|
||||
|
@ -55,6 +55,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
|
||||
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
|
||||
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
|
||||
private static final int NUMBER_OF_MESSAGES = 100;
|
||||
|
||||
public static void main(String[] arg) {
|
||||
if (arg.length != 3) {
|
||||
@ -80,7 +81,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
producer.send(session.createTextMessage("hello"));
|
||||
}
|
||||
|
||||
@ -128,8 +129,10 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||
Destination destination = session.createQueue(destinationName);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
Assert.assertNotNull(consumer.receive(1000));
|
||||
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||
// give more time to receive first message but do not wait so long for last as all message were most likely sent
|
||||
Assert.assertNotNull("consumer.receive(...) returned null for " + i + "th message. Number of expected messages" +
|
||||
" to be received is " + NUMBER_OF_MESSAGES, i == NUMBER_OF_MESSAGES - 1 ? consumer.receive(500) : consumer.receive(5000));
|
||||
}
|
||||
} finally {
|
||||
connection.stop();
|
||||
@ -155,14 +158,14 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
||||
int i = 0;
|
||||
for (; i < 100; i++) {
|
||||
Message msg = consumer.receive(100);
|
||||
for (; i < NUMBER_OF_MESSAGES; i++) {
|
||||
Message msg = consumer.receive(1000);
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(i < 100);
|
||||
Assert.assertTrue(i < NUMBER_OF_MESSAGES);
|
||||
} finally {
|
||||
connection.stop();
|
||||
connection.close();
|
||||
|
@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
|
||||
@ -337,6 +338,23 @@ public class FailoverTest extends FailoverTestBase {
|
||||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
|
||||
|
||||
final CountDownLatch connectionFailed = new CountDownLatch(1);
|
||||
|
||||
session.addFailureListener(new SessionFailureListener() {
|
||||
@Override
|
||||
public void beforeReconnect(ActiveMQException exception) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
|
||||
connectionFailed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
|
||||
|
||||
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
|
||||
@ -358,6 +376,7 @@ public class FailoverTest extends FailoverTestBase {
|
||||
session.commit(xid, false);
|
||||
} catch (XAException e) {
|
||||
//there is still an edge condition that we must deal with
|
||||
Assert.assertTrue(connectionFailed.await(10, TimeUnit.SECONDS));
|
||||
session.commit(xid, false);
|
||||
}
|
||||
|
||||
|
@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
@ -1019,6 +1020,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||
|
||||
session.commit();
|
||||
|
||||
// flush executors on queues so we can get precise number of messages
|
||||
Queue queue1 = server.locateQueue(SimpleString.toSimpleString(random1));
|
||||
queue1.flushExecutor();
|
||||
Queue queue2 = server.locateQueue(SimpleString.toSimpleString(random1));
|
||||
queue2.flushExecutor();
|
||||
|
||||
assertEquals(2, serverControl.getTotalMessageCount());
|
||||
|
||||
session.deleteQueue(random1);
|
||||
|
@ -90,7 +90,8 @@ public final class SpawnedVMSupport {
|
||||
final String... args) throws Exception {
|
||||
ProcessBuilder builder = new ProcessBuilder();
|
||||
final String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString();
|
||||
builder.command(javaPath, memoryArg1, memoryArg2, "-cp", System.getProperty("java.class.path"));
|
||||
builder.command(javaPath, memoryArg1, memoryArg2);
|
||||
builder.environment().put("CLASSPATH", System.getProperty("java.class.path"));
|
||||
|
||||
List<String> commandList = builder.command();
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user