This commit is contained in:
Clebert Suconic 2019-08-21 21:04:04 -04:00
parent a057f82a87
commit b828de502c
1 changed files with 12 additions and 21 deletions

View File

@ -35,10 +35,13 @@ import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
@ -48,9 +51,11 @@ import org.junit.Test;
import static java.util.concurrent.TimeUnit.SECONDS;
public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
// Constants -----------------------------------------------------
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
public static final String BROKER_URL = "tcp://localhost:61616?minLargeMessageSize=10000&HA=true&retryInterval=100&reconnectAttempts=20&producerWindowSize=10000";
private Connection connection;
@ -60,8 +65,6 @@ public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
private CountDownLatch reconnectionLatch;
private CountDownLatch allMessagesReceived;
private Context context;
private static final String QUEUE_NAME = ManualReconnectionToSingleServerTest.class.getSimpleName() + ".queue";
private static final int NUM = 20;
@ -84,8 +87,9 @@ public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
public void testExceptionListener() throws Exception {
connect();
ConnectionFactory cf = (ConnectionFactory) context.lookup("cf");
Destination dest = (Destination) context.lookup(QUEUE_NAME);
ConnectionFactory cf = new ActiveMQConnectionFactory(BROKER_URL);
Destination dest = (Destination) ActiveMQJMSClient.createQueue(QUEUE_NAME);
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = sess.createProducer(dest);
@ -98,10 +102,9 @@ public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
if (i == NUM / 2) {
conn.close();
server.stop();
Thread.sleep(5000);
server.start();
cf = (ConnectionFactory) context.lookup("cf");
dest = (Destination) context.lookup(QUEUE_NAME);
cf = new ActiveMQConnectionFactory(BROKER_URL);
dest = (Destination) ActiveMQJMSClient.createQueue(QUEUE_NAME);
conn = cf.createConnection();
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
prod = sess.createProducer(dest);
@ -133,13 +136,6 @@ public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory.class.getCanonicalName());
props.put("queue." + QUEUE_NAME, QUEUE_NAME);
props.put("connectionFactory.cf", "tcp://127.0.0.1:61616?retryInterval=1000&reconnectAttempts=-1");
context = new InitialContext(props);
server = createServer(false, createDefaultNettyConfig());
Configuration configuration = new ConfigurationImpl();
@ -184,16 +180,10 @@ public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
int retries = 0;
final int retryLimit = 1000;
try {
if (context == null) {
return;
}
Context initialContext = context;
Queue queue;
ConnectionFactory cf;
while (true) {
try {
queue = (Queue) initialContext.lookup(QUEUE_NAME);
cf = (ConnectionFactory) initialContext.lookup("cf");
cf = new ActiveMQConnectionFactory(BROKER_URL);
break;
} catch (Exception e) {
if (retries++ > retryLimit)
@ -205,6 +195,7 @@ public class ManualReconnectionToSingleServerTest extends ActiveMQTestBase {
connection = cf.createConnection();
connection.setExceptionListener(exceptionListener);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
consumer = session.createConsumer(queue);
consumer.setMessageListener(listener);
connection.start();