ARTEMIS-995 Bulk of test fixes
This commit fixes tests:
ActiveMQScheduledComponentTest.testAccumulationOwnPool
PendingDeliveriesTest.testWithtReconnect
ReceiveTest.testReceiveImmediate
ActiveMQProducerResourceTest.testSendString
EmbeddedActiveMQResourceTest.testSendString
MultipleEmbeddedActiveMQResourcesTest.testMultipleServers
MultipleEmbeddedJMSResourcesTest.testMultipleServers
ActiveMQDynamicProducerResourceWithoutAddressTest.testSendString
ActiveMQDynamicProducerResourceWithoutAddressExceptionTest.testSendBytesToDefaultAddress
ActiveMQDynamicProducerResourceTest.testSendString
ActiveMQServerControlTest.testTotalMessageCount
EmbeddedActiveMQResourceCustomConfigurationTest.testCustomConfiguration
EmbeddedJMSResourceMultipleFileConfigurationTest.testConfiguration
EmbeddedJMSResourceQueueTest.testPushObjectMessage
EmbeddedJMSResourceSingleFileConfigurationTest.testConfiguration
EmbeddedActiveMQResourceFileConfigurationTest.testConfiguredQueue
EmbeddedJMSResourceTopicTest.testPushObjectMessage
LargeMessageFailoverTest.testTimeoutOnFailoverTransactionCommit
(cherry picked from commit 0c64cbfa4e
)
This commit is contained in:
parent
20d627f2b7
commit
a4b33bb33a
|
@ -101,7 +101,7 @@ public class ActiveMQScheduledComponentTest {
|
||||||
|
|
||||||
local.stop();
|
local.stop();
|
||||||
|
|
||||||
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() < 5 && count.get() > 0);
|
Assert.assertTrue("just because one took a lot of time, it doesn't mean we can accumulate many, we got " + count + " executions", count.get() <= 5 && count.get() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -206,9 +206,9 @@ public class EmbeddedActiveMQResource extends ExternalResource {
|
||||||
protected void after() {
|
protected void after() {
|
||||||
log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
|
log.info("Stopping {}: {}", this.getClass().getSimpleName(), getServerName());
|
||||||
|
|
||||||
super.after();
|
|
||||||
|
|
||||||
this.stop();
|
this.stop();
|
||||||
|
|
||||||
|
super.after();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isUseDurableMessage() {
|
public boolean isUseDurableMessage() {
|
||||||
|
|
|
@ -44,6 +44,9 @@ public class ActiveMQDynamicProducerResourceTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||||
|
|
|
@ -36,6 +36,9 @@ public class ActiveMQDynamicProducerResourceWithoutAddressExceptionTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||||
|
|
|
@ -45,6 +45,9 @@ public class ActiveMQDynamicProducerResourceWithoutAddressTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||||
|
|
|
@ -43,6 +43,9 @@ public class ActiveMQProducerResourceTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||||
|
|
|
@ -36,6 +36,12 @@ public class EmbeddedActiveMQResourceCustomConfigurationTest {
|
||||||
static final String TEST_QUEUE = "test.queue";
|
static final String TEST_QUEUE = "test.queue";
|
||||||
static final String TEST_ADDRESS = "test.address";
|
static final String TEST_ADDRESS = "test.address";
|
||||||
|
|
||||||
|
static {
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||||
|
}
|
||||||
|
|
||||||
CoreQueueConfiguration queueConfiguration = new CoreQueueConfiguration().setAddress(TEST_ADDRESS).setName(TEST_QUEUE);
|
CoreQueueConfiguration queueConfiguration = new CoreQueueConfiguration().setAddress(TEST_ADDRESS).setName(TEST_QUEUE);
|
||||||
Configuration customConfiguration = new ConfigurationImpl().setPersistenceEnabled(false).setSecurityEnabled(true).addQueueConfiguration(queueConfiguration);
|
Configuration customConfiguration = new ConfigurationImpl().setPersistenceEnabled(false).setSecurityEnabled(true).addQueueConfiguration(queueConfiguration);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,12 @@ public class EmbeddedActiveMQResourceFileConfigurationTest {
|
||||||
static final String TEST_QUEUE = "test.queue";
|
static final String TEST_QUEUE = "test.queue";
|
||||||
static final String TEST_ADDRESS = "test.address";
|
static final String TEST_ADDRESS = "test.address";
|
||||||
|
|
||||||
|
static {
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||||
|
}
|
||||||
|
|
||||||
private EmbeddedActiveMQResource server = new EmbeddedActiveMQResource("embedded-artemis-server.xml");
|
private EmbeddedActiveMQResource server = new EmbeddedActiveMQResource("embedded-artemis-server.xml");
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
|
|
@ -45,6 +45,9 @@ public class EmbeddedActiveMQResourceTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
public EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
public EmbeddedActiveMQResource server = new EmbeddedActiveMQResource();
|
||||||
|
|
|
@ -43,6 +43,12 @@ public class EmbeddedJMSResourceMultipleFileConfigurationTest {
|
||||||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||||
|
|
||||||
|
static {
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||||
|
}
|
||||||
|
|
||||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-minimal-server.xml", "embedded-artemis-jms-only.xml");
|
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-minimal-server.xml", "embedded-artemis-jms-only.xml");
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
|
|
@ -47,6 +47,10 @@ public class EmbeddedJMSResourceQueueTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||||
}
|
}
|
||||||
|
|
||||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
||||||
|
|
|
@ -43,6 +43,12 @@ public class EmbeddedJMSResourceSingleFileConfigurationTest {
|
||||||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||||
|
|
||||||
|
static {
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||||
|
}
|
||||||
|
|
||||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-jms-server.xml");
|
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource("embedded-artemis-jms-server.xml");
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
|
|
|
@ -54,6 +54,10 @@ public class EmbeddedJMSResourceTopicTest {
|
||||||
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
TEST_PROPERTIES = new HashMap<String, Object>(2);
|
||||||
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
TEST_PROPERTIES.put("PropertyOne", "Property Value 1");
|
||||||
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
TEST_PROPERTIES.put("PropertyTwo", "Property Value 2");
|
||||||
|
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("SeedGenerator Thread");
|
||||||
}
|
}
|
||||||
|
|
||||||
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
public EmbeddedJMSResource jmsServer = new EmbeddedJMSResource();
|
||||||
|
|
|
@ -38,6 +38,11 @@ public class MultipleEmbeddedActiveMQResourcesTest {
|
||||||
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
|
static final String ASSERT_RECEIVED_FORMAT = "Message should have been received from %s";
|
||||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
|
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in queue %s";
|
||||||
|
|
||||||
|
static {
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
}
|
||||||
|
|
||||||
public EmbeddedActiveMQResource serverOne = new EmbeddedActiveMQResource(0);
|
public EmbeddedActiveMQResource serverOne = new EmbeddedActiveMQResource(0);
|
||||||
|
|
||||||
public EmbeddedActiveMQResource serverTwo = new EmbeddedActiveMQResource(1);
|
public EmbeddedActiveMQResource serverTwo = new EmbeddedActiveMQResource(1);
|
||||||
|
|
|
@ -34,6 +34,11 @@ public class MultipleEmbeddedJMSResourcesTest {
|
||||||
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
static final String ASSERT_PUSHED_FORMAT = "Message should have been pushed a message to %s";
|
||||||
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
static final String ASSERT_COUNT_FORMAT = "Unexpected message count in destination %s";
|
||||||
|
|
||||||
|
static {
|
||||||
|
ThreadLeakCheckRule.addKownThread("MemoryPoolMXBean notification dispatcher");
|
||||||
|
ThreadLeakCheckRule.addKownThread("threadDeathWatcher");
|
||||||
|
}
|
||||||
|
|
||||||
public EmbeddedJMSResource jmsServerOne = new EmbeddedJMSResource(0);
|
public EmbeddedJMSResource jmsServerOne = new EmbeddedJMSResource(0);
|
||||||
|
|
||||||
public EmbeddedJMSResource jmsServerTwo = new EmbeddedJMSResource(1);
|
public EmbeddedJMSResource jmsServerTwo = new EmbeddedJMSResource(1);
|
||||||
|
|
|
@ -20,6 +20,8 @@ import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLEngine;
|
import javax.net.ssl.SSLEngine;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -43,7 +45,9 @@ import io.netty.handler.ssl.SslHandler;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
import org.apache.activemq.artemis.component.WebServerComponent;
|
import org.apache.activemq.artemis.component.WebServerComponent;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
|
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.dto.WebServerDTO;
|
import org.apache.activemq.artemis.dto.WebServerDTO;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -54,12 +58,21 @@ public class WebServerComponentTest extends Assert {
|
||||||
static final String SECURE_URL = System.getProperty("url", "https://localhost:8448/WebServerComponentTest.txt");
|
static final String SECURE_URL = System.getProperty("url", "https://localhost:8448/WebServerComponentTest.txt");
|
||||||
private Bootstrap bootstrap;
|
private Bootstrap bootstrap;
|
||||||
private EventLoopGroup group;
|
private EventLoopGroup group;
|
||||||
|
private List<ActiveMQComponent> testedComponents;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupNetty() throws URISyntaxException {
|
public void setupNetty() throws URISyntaxException {
|
||||||
// Configure the client.
|
// Configure the client.
|
||||||
group = new NioEventLoopGroup();
|
group = new NioEventLoopGroup();
|
||||||
bootstrap = new Bootstrap();
|
bootstrap = new Bootstrap();
|
||||||
|
testedComponents = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
for (ActiveMQComponent c : testedComponents) {
|
||||||
|
c.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -70,6 +83,7 @@ public class WebServerComponentTest extends Assert {
|
||||||
WebServerComponent webServerComponent = new WebServerComponent();
|
WebServerComponent webServerComponent = new WebServerComponent();
|
||||||
Assert.assertFalse(webServerComponent.isStarted());
|
Assert.assertFalse(webServerComponent.isStarted());
|
||||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||||
|
testedComponents.add(webServerComponent);
|
||||||
webServerComponent.start();
|
webServerComponent.start();
|
||||||
// Make the connection attempt.
|
// Make the connection attempt.
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
@ -110,6 +124,7 @@ public class WebServerComponentTest extends Assert {
|
||||||
WebServerComponent webServerComponent = new WebServerComponent();
|
WebServerComponent webServerComponent = new WebServerComponent();
|
||||||
Assert.assertFalse(webServerComponent.isStarted());
|
Assert.assertFalse(webServerComponent.isStarted());
|
||||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||||
|
testedComponents.add(webServerComponent);
|
||||||
webServerComponent.start();
|
webServerComponent.start();
|
||||||
// Make the connection attempt.
|
// Make the connection attempt.
|
||||||
String keyStoreProvider = "JKS";
|
String keyStoreProvider = "JKS";
|
||||||
|
@ -163,6 +178,7 @@ public class WebServerComponentTest extends Assert {
|
||||||
WebServerComponent webServerComponent = new WebServerComponent();
|
WebServerComponent webServerComponent = new WebServerComponent();
|
||||||
Assert.assertFalse(webServerComponent.isStarted());
|
Assert.assertFalse(webServerComponent.isStarted());
|
||||||
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
webServerComponent.configure(webServerDTO, "./src/test/resources/", "./src/test/resources/");
|
||||||
|
testedComponents.add(webServerComponent);
|
||||||
webServerComponent.start();
|
webServerComponent.start();
|
||||||
// Make the connection attempt.
|
// Make the connection attempt.
|
||||||
String keyStoreProvider = "JKS";
|
String keyStoreProvider = "JKS";
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class ReceiveTest extends ActiveMQTestBase {
|
||||||
public void testReceiveImmediate() throws Exception {
|
public void testReceiveImmediate() throws Exception {
|
||||||
|
|
||||||
// forces perfect round robin
|
// forces perfect round robin
|
||||||
locator.setConsumerWindowSize(1);
|
locator.setConsumerMaxRate(1);
|
||||||
ClientSessionFactory cf = createSessionFactory(locator);
|
ClientSessionFactory cf = createSessionFactory(locator);
|
||||||
ClientSession sendSession = cf.createSession(false, true, true);
|
ClientSession sendSession = cf.createSession(false, true, true);
|
||||||
ClientProducer cp = sendSession.createProducer(addressA);
|
ClientProducer cp = sendSession.createProducer(addressA);
|
||||||
|
|
|
@ -52,6 +52,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||||
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
|
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
|
||||||
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
|
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
|
||||||
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
|
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
|
||||||
|
private static final int NUMBER_OF_MESSAGES = 100;
|
||||||
|
|
||||||
public static void main(String[] arg) {
|
public static void main(String[] arg) {
|
||||||
if (arg.length != 3) {
|
if (arg.length != 3) {
|
||||||
|
@ -77,7 +78,7 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||||
MessageConsumer consumer = session.createConsumer(destination);
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
MessageProducer producer = session.createProducer(destination);
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||||
producer.send(session.createTextMessage("hello"));
|
producer.send(session.createTextMessage("hello"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,8 +126,10 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||||
Destination destination = session.createQueue(destinationName);
|
Destination destination = session.createQueue(destinationName);
|
||||||
MessageConsumer consumer = session.createConsumer(destination);
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
|
||||||
Assert.assertNotNull(consumer.receive(1000));
|
// give more time to receive first message but do not wait so long for last as all message were most likely sent
|
||||||
|
Assert.assertNotNull("consumer.receive(...) returned null for " + i + "th message. Number of expected messages" +
|
||||||
|
" to be received is " + NUMBER_OF_MESSAGES, i == NUMBER_OF_MESSAGES - 1 ? consumer.receive(500) : consumer.receive(5000));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
connection.stop();
|
connection.stop();
|
||||||
|
@ -152,14 +155,14 @@ public class PendingDeliveriesTest extends ClientTestBase {
|
||||||
MessageConsumer consumer = session.createConsumer(destination);
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (; i < 100; i++) {
|
for (; i < NUMBER_OF_MESSAGES; i++) {
|
||||||
Message msg = consumer.receive(100);
|
Message msg = consumer.receive(1000);
|
||||||
if (msg == null) {
|
if (msg == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue(i < 100);
|
Assert.assertTrue(i < NUMBER_OF_MESSAGES);
|
||||||
} finally {
|
} finally {
|
||||||
connection.stop();
|
connection.stop();
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
||||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
|
import org.apache.activemq.artemis.core.server.cluster.ha.BackupPolicy;
|
||||||
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
|
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
|
||||||
|
@ -336,6 +337,23 @@ public class FailoverTest extends FailoverTestBase {
|
||||||
|
|
||||||
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
|
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
|
||||||
|
|
||||||
|
final CountDownLatch connectionFailed = new CountDownLatch(1);
|
||||||
|
|
||||||
|
session.addFailureListener(new SessionFailureListener() {
|
||||||
|
@Override
|
||||||
|
public void beforeReconnect(ActiveMQException exception) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
|
||||||
|
connectionFailed.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
|
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
|
||||||
|
|
||||||
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
|
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
|
||||||
|
@ -357,6 +375,7 @@ public class FailoverTest extends FailoverTestBase {
|
||||||
session.commit(xid, false);
|
session.commit(xid, false);
|
||||||
} catch (XAException e) {
|
} catch (XAException e) {
|
||||||
//there is still an edge condition that we must deal with
|
//there is still an edge condition that we must deal with
|
||||||
|
Assert.assertTrue(connectionFailed.await(10, TimeUnit.SECONDS));
|
||||||
session.commit(xid, false);
|
session.commit(xid, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.security.Role;
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||||
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||||
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
|
||||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||||
|
@ -956,6 +957,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
|
||||||
|
|
||||||
session.commit();
|
session.commit();
|
||||||
|
|
||||||
|
// flush executors on queues so we can get precise number of messages
|
||||||
|
Queue queue1 = server.locateQueue(SimpleString.toSimpleString(random1));
|
||||||
|
queue1.flushExecutor();
|
||||||
|
Queue queue2 = server.locateQueue(SimpleString.toSimpleString(random1));
|
||||||
|
queue2.flushExecutor();
|
||||||
|
|
||||||
assertEquals(2, serverControl.getTotalMessageCount());
|
assertEquals(2, serverControl.getTotalMessageCount());
|
||||||
|
|
||||||
session.deleteQueue(random1);
|
session.deleteQueue(random1);
|
||||||
|
|
|
@ -90,7 +90,8 @@ public final class SpawnedVMSupport {
|
||||||
final String... args) throws Exception {
|
final String... args) throws Exception {
|
||||||
ProcessBuilder builder = new ProcessBuilder();
|
ProcessBuilder builder = new ProcessBuilder();
|
||||||
final String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString();
|
final String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java").toAbsolutePath().toString();
|
||||||
builder.command(javaPath, memoryArg1, memoryArg2, "-cp", System.getProperty("java.class.path"));
|
builder.command(javaPath, memoryArg1, memoryArg2);
|
||||||
|
builder.environment().put("CLASSPATH", System.getProperty("java.class.path"));
|
||||||
|
|
||||||
List<String> commandList = builder.command();
|
List<String> commandList = builder.command();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue